## What changes were proposed in this pull request?
This pr supported Date/Timestamp in a JDBC partition column (a numeric column is only supported in the master). This pr also modified code to verify a partition column type;
```
val jdbcTable = spark.read
.option("partitionColumn", "text")
.option("lowerBound", "aaa")
.option("upperBound", "zzz")
.option("numPartitions", 2)
.jdbc("jdbc:postgresql:postgres", "t", options)
// with this pr
org.apache.spark.sql.AnalysisException: Partition column type should be numeric, date, or timestamp, but string found.;
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.verifyAndGetNormalizedPartitionColumn(JDBCRelation.scala:165)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.columnPartition(JDBCRelation.scala:85)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:36)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:317)
// without this pr
java.lang.NumberFormatException: For input string: "aaa"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Long.parseLong(Long.java:589)
at java.lang.Long.parseLong(Long.java:631)
at scala.collection.immutable.StringLike$class.toLong(StringLike.scala:277)
```
Closes#19999
## How was this patch tested?
Added tests in `JDBCSuite`.
Author: Takeshi Yamamuro <yamamuro@apache.org>
Closes#21834 from maropu/SPARK-22814.
## What changes were proposed in this pull request?
Upgrade Apache Avro from 1.7.7 to 1.8.2. The major new features:
1. More logical types. From the spec of 1.8.2 https://avro.apache.org/docs/1.8.2/spec.html#Logical+Types we can see comparing to [1.7.7](https://avro.apache.org/docs/1.7.7/spec.html#Logical+Types), the new version support:
- Date
- Time (millisecond precision)
- Time (microsecond precision)
- Timestamp (millisecond precision)
- Timestamp (microsecond precision)
- Duration
2. Single-object encoding: https://avro.apache.org/docs/1.8.2/spec.html#single_object_encoding
This PR aims to update Apache Spark to support these new features.
## How was this patch tested?
Unit test
Author: Gengliang Wang <gengliang.wang@databricks.com>
Closes#21761 from gengliangwang/upgrade_avro_1.8.
## What changes were proposed in this pull request?
Looks we intentionally set `null` for upper/lower bounds for complex types and don't use it. However, these look used in in-memory partition pruning, which ends up with incorrect results.
This PR proposes to explicitly whitelist the supported types.
```scala
val df = Seq(Array("a", "b"), Array("c", "d")).toDF("arrayCol")
df.cache().filter("arrayCol > array('a', 'b')").show()
```
```scala
val df = sql("select cast('a' as binary) as a")
df.cache().filter("a == cast('a' as binary)").show()
```
**Before:**
```
+--------+
|arrayCol|
+--------+
+--------+
```
```
+---+
| a|
+---+
+---+
```
**After:**
```
+--------+
|arrayCol|
+--------+
| [c, d]|
+--------+
```
```
+----+
| a|
+----+
|[61]|
+----+
```
## How was this patch tested?
Unit tests were added and manually tested.
Author: hyukjinkwon <gurwls223@apache.org>
Closes#21882 from HyukjinKwon/stats-filter.
## What changes were proposed in this pull request?
Implements INTERSECT ALL clause through query rewrites using existing operators in Spark. Please refer to [Link](https://drive.google.com/open?id=1nyW0T0b_ajUduQoPgZLAsyHK8s3_dko3ulQuxaLpUXE) for the design.
Input Query
``` SQL
SELECT c1 FROM ut1 INTERSECT ALL SELECT c1 FROM ut2
```
Rewritten Query
```SQL
SELECT c1
FROM (
SELECT replicate_row(min_count, c1)
FROM (
SELECT c1,
IF (vcol1_cnt > vcol2_cnt, vcol2_cnt, vcol1_cnt) AS min_count
FROM (
SELECT c1, count(vcol1) as vcol1_cnt, count(vcol2) as vcol2_cnt
FROM (
SELECT c1, true as vcol1, null as vcol2 FROM ut1
UNION ALL
SELECT c1, null as vcol1, true as vcol2 FROM ut2
) AS union_all
GROUP BY c1
HAVING vcol1_cnt >= 1 AND vcol2_cnt >= 1
)
)
)
```
## How was this patch tested?
Added test cases in SQLQueryTestSuite, DataFrameSuite, SetOperationSuite
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes#21886 from dilipbiswal/dkb_intersect_all_final.
## What changes were proposed in this pull request?
This PR propose to address https://github.com/apache/spark/pull/21318#discussion_r187843125 comment.
This is rather a nit but looks we better avoid to update the link for each release since it always points the latest (it doesn't look like worth enough updating release guide on the other hand as well).
## How was this patch tested?
N/A
Author: hyukjinkwon <gurwls223@apache.org>
Closes#21907 from HyukjinKwon/minor-fix.
When join key is long or int in broadcast join, Spark will use `LongToUnsafeRowMap` to store key-values of the table witch will be broadcasted. But, when `LongToUnsafeRowMap` is broadcasted to executors, and it is too big to hold in memory, it will be stored in disk. At that time, because `write` uses a variable `cursor` to determine how many bytes in `page` of `LongToUnsafeRowMap` will be write out and the `cursor` was not restore when deserializing, executor will write out nothing from page into disk.
## What changes were proposed in this pull request?
Restore cursor value when deserializing.
Author: liulijia <liutang123@yeah.net>
Closes#21772 from liutang123/SPARK-24809.
## What changes were proposed in this pull request?
Implements EXCEPT ALL clause through query rewrites using existing operators in Spark. In this PR, an internal UDTF (replicate_rows) is added to aid in preserving duplicate rows. Please refer to [Link](https://drive.google.com/open?id=1nyW0T0b_ajUduQoPgZLAsyHK8s3_dko3ulQuxaLpUXE) for the design.
**Note** This proposed UDTF is kept as a internal function that is purely used to aid with this particular rewrite to give us flexibility to change to a more generalized UDTF in future.
Input Query
``` SQL
SELECT c1 FROM ut1 EXCEPT ALL SELECT c1 FROM ut2
```
Rewritten Query
```SQL
SELECT c1
FROM (
SELECT replicate_rows(sum_val, c1)
FROM (
SELECT c1, sum_val
FROM (
SELECT c1, sum(vcol) AS sum_val
FROM (
SELECT 1L as vcol, c1 FROM ut1
UNION ALL
SELECT -1L as vcol, c1 FROM ut2
) AS union_all
GROUP BY union_all.c1
)
WHERE sum_val > 0
)
)
```
## How was this patch tested?
Added test cases in SQLQueryTestSuite, DataFrameSuite and SetOperationSuite
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes#21857 from dilipbiswal/dkb_except_all_final.
## What changes were proposed in this pull request?
This PR adds a new collection function: shuffle. It generates a random permutation of the given array. This implementation uses the "inside-out" version of Fisher-Yates algorithm.
## How was this patch tested?
New tests are added to CollectionExpressionsSuite.scala and DataFrameFunctionsSuite.scala.
Author: Takuya UESHIN <ueshin@databricks.com>
Author: pkuwm <ihuizhi.lu@gmail.com>
Closes#21802 from ueshin/issues/SPARK-23928/shuffle.
## What changes were proposed in this pull request?
Add a JDBC Option "pushDownPredicate" (default `true`) to allow/disallow predicate push-down in JDBC data source.
## How was this patch tested?
Add a test in `JDBCSuite`
Author: maryannxue <maryannxue@apache.org>
Closes#21875 from maryannxue/spark-24288.
## What changes were proposed in this pull request?
AnalysisBarrier was introduced in SPARK-20392 to improve analysis speed (don't re-analyze nodes that have already been analyzed).
Before AnalysisBarrier, we already had some infrastructure in place, with analysis specific functions (resolveOperators and resolveExpressions). These functions do not recursively traverse down subplans that are already analyzed (with a mutable boolean flag _analyzed). The issue with the old system was that developers started using transformDown, which does a top-down traversal of the plan tree, because there was not top-down resolution function, and as a result analyzer performance became pretty bad.
In order to fix the issue in SPARK-20392, AnalysisBarrier was introduced as a special node and for this special node, transform/transformUp/transformDown don't traverse down. However, the introduction of this special node caused a lot more troubles than it solves. This implicit node breaks assumptions and code in a few places, and it's hard to know when analysis barrier would exist, and when it wouldn't. Just a simple search of AnalysisBarrier in PR discussions demonstrates it is a source of bugs and additional complexity.
Instead, this pull request removes AnalysisBarrier and reverts back to the old approach. We added infrastructure in tests that fail explicitly if transform methods are used in the analyzer.
## How was this patch tested?
Added a test suite AnalysisHelperSuite for testing the resolve* methods and transform* methods.
Author: Reynold Xin <rxin@databricks.com>
Author: Xiao Li <gatorsmile@gmail.com>
Closes#21822 from rxin/SPARK-24865.
## What changes were proposed in this pull request?
In most cases, we should use `spark.sessionState.newHadoopConf()` instead of `sparkContext.hadoopConfiguration`, so that the hadoop configurations specified in Spark session
configuration will come into effect.
Add a rule matching `spark.sparkContext.hadoopConfiguration` or `spark.sqlContext.sparkContext.hadoopConfiguration` to prevent the usage.
## How was this patch tested?
Unit test
Author: Gengliang Wang <gengliang.wang@databricks.com>
Closes#21873 from gengliangwang/linterRule.
## What changes were proposed in this pull request?
This is an extension to the original PR, in which rule exclusion did not work for classes derived from Optimizer, e.g., SparkOptimizer.
To solve this issue, Optimizer and its derived classes will define/override `defaultBatches` and `nonExcludableRules` in order to define its default rule set as well as rules that cannot be excluded by the SQL config. In the meantime, Optimizer's `batches` method is dedicated to the rule exclusion logic and is defined "final".
## How was this patch tested?
Added UT.
Author: maryannxue <maryannxue@apache.org>
Closes#21876 from maryannxue/rule-exclusion.
## What changes were proposed in this pull request?
This PR aims to the followings.
1. Like `com.databricks.spark.csv` mapping, we had better map `com.databricks.spark.avro` to built-in Avro data source.
2. Remove incorrect error message, `Please find an Avro package at ...`.
## How was this patch tested?
Pass the newly added tests.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#21878 from dongjoon-hyun/SPARK-24924.
## What changes were proposed in this pull request?
If we use `reverse` function for array type of primitive type containing `null` and the child array is `UnsafeArrayData`, the function returns a wrong result because `UnsafeArrayData` doesn't define the behavior of re-assignment, especially we can't set a valid value after we set `null`.
## How was this patch tested?
Added some tests.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#21830 from ueshin/issues/SPARK-24878/fix_reverse.
## What changes were proposed in this pull request?
```Scala
val udf1 = udf({(x: Int, y: Int) => x + y})
val df = spark.range(0, 3).toDF("a")
.withColumn("b", udf1($"a", udf1($"a", lit(10))))
df.cache()
df.write.saveAsTable("t")
```
Cache is not being used because the plans do not match with the cached plan. This is a regression caused by the changes we made in AnalysisBarrier, since not all the Analyzer rules are idempotent.
## How was this patch tested?
Added a test.
Also found a bug in the DSV1 write path. This is not a regression. Thus, opened a separate JIRA https://issues.apache.org/jira/browse/SPARK-24869
Author: Xiao Li <gatorsmile@gmail.com>
Closes#21821 from gatorsmile/testMaster22.
## What changes were proposed in this pull request?
Besides spark setting spark.sql.sources.partitionOverwriteMode also allow setting partitionOverWriteMode per write
## How was this patch tested?
Added unit test in InsertSuite
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Koert Kuipers <koert@tresata.com>
Closes#21818 from koertkuipers/feat-partition-overwrite-mode-per-write.
## What changes were proposed in this pull request?
In the PR, I propose to extend the `StructType`/`StructField` classes by new method `toDDL` which converts a value of the `StructType`/`StructField` type to a string formatted in DDL style. The resulted string can be used in a table creation.
The `toDDL` method of `StructField` is reused in `SHOW CREATE TABLE`. In this way the PR fixes the bug of unquoted names of nested fields.
## How was this patch tested?
I add a test for checking the new method and 2 round trip tests: `fromDDL` -> `toDDL` and `toDDL` -> `fromDDL`
Author: Maxim Gekk <maxim.gekk@databricks.com>
Closes#21803 from MaxGekk/to-ddl.
## What changes were proposed in this pull request?
Improvement `IN` predicate type mismatched message:
```sql
Mismatched columns:
[(, t, 4, ., `, t, 4, a, `, :, d, o, u, b, l, e, ,, , t, 5, ., `, t, 5, a, `, :, d, e, c, i, m, a, l, (, 1, 8, ,, 0, ), ), (, t, 4, ., `, t, 4, c, `, :, s, t, r, i, n, g, ,, , t, 5, ., `, t, 5, c, `, :, b, i, g, i, n, t, )]
```
After this patch:
```sql
Mismatched columns:
[(t4.`t4a`:double, t5.`t5a`:decimal(18,0)), (t4.`t4c`:string, t5.`t5c`:bigint)]
```
## How was this patch tested?
unit tests
Author: Yuming Wang <yumwang@ebay.com>
Closes#21863 from wangyum/SPARK-18874.
## What changes were proposed in this pull request?
Add support for custom encoding on csv writer, see https://issues.apache.org/jira/browse/SPARK-19018
## How was this patch tested?
Added two unit tests in CSVSuite
Author: crafty-coder <carlospb86@gmail.com>
Author: Carlos <crafty-coder@users.noreply.github.com>
Closes#20949 from crafty-coder/master.
## What changes were proposed in this pull request?
Thanks to henryr for the original idea at https://github.com/apache/spark/pull/21049
Description from the original PR :
Subqueries (at least in SQL) have 'bag of tuples' semantics. Ordering
them is therefore redundant (unless combined with a limit).
This patch removes the top sort operators from the subquery plans.
This closes https://github.com/apache/spark/pull/21049.
## How was this patch tested?
Added test cases in SubquerySuite to cover in, exists and scalar subqueries.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes#21853 from dilipbiswal/SPARK-23957.
## What changes were proposed in this pull request?
When `trueValue` and `falseValue` are semantic equivalence, the condition expression in `if` can be removed to avoid extra computation in runtime.
## How was this patch tested?
Test added.
Author: DB Tsai <d_tsai@apple.com>
Closes#21848 from dbtsai/short-circuit-if.
## What changes were proposed in this pull request?
The HandleNullInputsForUDF would always add a new `If` node every time it is applied. That would cause a difference between the same plan being analyzed once and being analyzed twice (or more), thus raising issues like plan not matched in the cache manager. The solution is to mark the arguments as null-checked, which is to add a "KnownNotNull" node above those arguments, when adding the UDF under an `If` node, because clearly the UDF will not be called when any of those arguments is null.
## How was this patch tested?
Add new tests under sql/UDFSuite and AnalysisSuite.
Author: maryannxue <maryannxue@apache.org>
Closes#21851 from maryannxue/spark-24891.
## What changes were proposed in this pull request?
This updates the DataSourceV2 API to use InternalRow instead of Row for the default case with no scan mix-ins.
Support for readers that produce Row is added through SupportsDeprecatedScanRow, which matches the previous API. Readers that used Row now implement this class and should be migrated to InternalRow.
Readers that previously implemented SupportsScanUnsafeRow have been migrated to use no SupportsScan mix-ins and produce InternalRow.
## How was this patch tested?
This uses existing tests.
Author: Ryan Blue <blue@apache.org>
Closes#21118 from rdblue/SPARK-23325-datasource-v2-internal-row.
## What changes were proposed in this pull request?
Modified the canonicalized to not case-insensitive.
Before the PR, cache can't work normally if there are case letters in SQL,
for example:
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("select key, sum(case when Key > 0 then 1 else 0 end) as positiveNum " +
"from src group by key").cache().createOrReplaceTempView("src_cache")
sql(
s"""select a.key
from
(select key from src_cache where positiveNum = 1)a
left join
(select key from src_cache )b
on a.key=b.key
""").explain
The physical plan of the sql is:
![image](https://user-images.githubusercontent.com/26834091/42979518-3decf0fa-8c05-11e8-9837-d5e4c334cb1f.png)
The subquery "select key from src_cache where positiveNum = 1" on the left of join can use the cache data, but the subquery "select key from src_cache" on the right of join cannot use the cache data.
## How was this patch tested?
new added test
Author: 10129659 <chen.yanshan@zte.com.cn>
Closes#21823 from eatoncys/canonicalized.
## What changes were proposed in this pull request?
Streaming queries with watermarks do not work with Trigger.Once because of the following.
- Watermark is updated in the driver memory after a batch completes, but it is persisted to checkpoint (in the offset log) only when the next batch is planned
- In trigger.once, the query terminated as soon as one batch has completed. Hence, the updated watermark is never persisted anywhere.
The simple solution is to persist the updated watermark value in the commit log when a batch is marked as completed. Then the next batch, in the next trigger.once run can pick it up from the commit log.
## How was this patch tested?
new unit tests
Co-authored-by: Tathagata Das <tathagata.das1565gmail.com>
Co-authored-by: c-horn <chorn4033gmail.com>
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#21746 from tdas/SPARK-24699.
This commit adds the `cascadeTruncate` option to the JDBC datasource
API, for databases that support this functionality (PostgreSQL and
Oracle at the moment). This allows for applying a cascading truncate
that affects tables that have foreign key constraints on the table
being truncated.
## What changes were proposed in this pull request?
Add `cascadeTruncate` option to JDBC datasource API. Allow this to affect the
`TRUNCATE` query for databases that support this option.
## How was this patch tested?
Existing tests for `truncateQuery` were updated. Also, an additional test was added
to ensure that the correct syntax was applied, and that enabling the config for databases
that do not support this option does not result in invalid queries.
Author: Daniel van der Ende <daniel.vanderende@gmail.com>
Closes#20057 from danielvdende/SPARK-22880.
## What changes were proposed in this pull request?
### What's problem?
In some cases, sub scalar query could throw a NPE, which is caused in execution side.
```
java.lang.NullPointerException
at org.apache.spark.sql.execution.FileSourceScanExec.<init>(DataSourceScanExec.scala:169)
at org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:526)
at org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:159)
at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:211)
at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:210)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$3.apply(QueryPlan.scala:225)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$3.apply(QueryPlan.scala:225)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:296)
at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:225)
at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:211)
at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:210)
at org.apache.spark.sql.catalyst.plans.QueryPlan.sameResult(QueryPlan.scala:258)
at org.apache.spark.sql.execution.ScalarSubquery.semanticEquals(subquery.scala:58)
at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$Expr.equals(EquivalentExpressions.scala:36)
at scala.collection.mutable.HashTable$class.elemEquals(HashTable.scala:364)
at scala.collection.mutable.HashMap.elemEquals(HashMap.scala:40)
at scala.collection.mutable.HashTable$class.scala$collection$mutable$HashTable$$findEntry0(HashTable.scala:139)
at scala.collection.mutable.HashTable$class.findEntry(HashTable.scala:135)
at scala.collection.mutable.HashMap.findEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.get(HashMap.scala:70)
at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExpr(EquivalentExpressions.scala:56)
at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:97)
at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$$anonfun$addExprTree$1.apply(EquivalentExpressions.scala:98)
at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$$anonfun$addExprTree$1.apply(EquivalentExpressions.scala:98)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:98)
at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$subexpressionElimination$1.apply(CodeGenerator.scala:1102)
at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$subexpressionElimination$1.apply(CodeGenerator.scala:1102)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.subexpressionElimination(CodeGenerator.scala:1102)
at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.generateExpressions(CodeGenerator.scala:1154)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.createCode(GenerateUnsafeProjection.scala:270)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:319)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.generate(GenerateUnsafeProjection.scala:308)
at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:181)
at org.apache.spark.sql.execution.ProjectExec$$anonfun$9.apply(basicPhysicalOperators.scala:71)
at org.apache.spark.sql.execution.ProjectExec$$anonfun$9.apply(basicPhysicalOperators.scala:70)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:818)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:818)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:367)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```
### How does this happen?
Here looks what happen now:
1. Sub scalar query was made (for instance `SELECT (SELECT id FROM foo)`).
2. Try to extract some common expressions (via `CodeGenerator.subexpressionElimination`) so that it can generates some common codes and can be reused.
3. During this, seems it extracts some expressions that can be reused (via `EquivalentExpressions.addExprTree`)
b2deef64f6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala (L1102)
4. During this, if the hash (`EquivalentExpressions.Expr.hashCode`) happened to be the same at `EquivalentExpressions.addExpr` anyhow, `EquivalentExpressions.Expr.equals` is called to identify object in the same hash, which eventually calls `semanticEquals` in `ScalarSubquery`
087879a77a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala (L54)087879a77a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala (L36)
5. `ScalarSubquery`'s `semanticEquals` needs `SubqueryExec`'s `sameResult`
77a2fc5b52/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala (L58)
6. `SubqueryExec`'s `sameResult` requires a canonicalized plan which calls `FileSourceScanExec`'s `doCanonicalize`
e008ad1752/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala (L258)
7. In `FileSourceScanExec`'s `doCanonicalize`, `FileSourceScanExec`'s `relation` is required but seems `transient` so it becomes `null`.
e76b0124fb/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala (L527)e76b0124fb/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala (L160)
8. NPE is thrown.
\*1. driver side
\*2., 3., 4., 5., 6., 7., 8. executor side
Note that most of cases, it looks fine because we will usually call:
087879a77a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala (L40)
which make a canonicalized plan via:
b045315e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala (L192)77a2fc5b52/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala (L52)
### How to reproduce?
This looks what happened now. I can reproduce this by a bit of messy way:
```diff
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
index 8d06804ce1e..d25fc9a7ba9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
-37,7 +37,9 class EquivalentExpressions {
case _ => false
}
- override def hashCode: Int = e.semanticHash()
+ override def hashCode: Int = {
+ 1
+ }
}
```
```scala
spark.range(1).write.mode("overwrite").parquet("/tmp/foo")
spark.read.parquet("/tmp/foo").createOrReplaceTempView("foo")
spark.conf.set("spark.sql.codegen.wholeStage", false)
sql("SELECT (SELECT id FROM foo) == (SELECT id FROM foo)").collect()
```
### How does this PR fix?
- Make all variables that access to `FileSourceScanExec`'s `relation` as `lazy val` so that we avoid NPE. This is a temporary fix.
- Allow `makeCopy` in `SparkPlan` without Spark session too. This looks still able to be accessed within executor side. For instance:
```
at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:70)
at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:47)
at org.apache.spark.sql.catalyst.trees.TreeNode.withNewChildren(TreeNode.scala:233)
at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:243)
at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:211)
at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:210)
at org.apache.spark.sql.catalyst.plans.QueryPlan.sameResult(QueryPlan.scala:258)
at org.apache.spark.sql.execution.ScalarSubquery.semanticEquals(subquery.scala:58)
at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$Expr.equals(EquivalentExpressions.scala:36)
at scala.collection.mutable.HashTable$class.elemEquals(HashTable.scala:364)
at scala.collection.mutable.HashMap.elemEquals(HashMap.scala:40)
at scala.collection.mutable.HashTable$class.scala$collection$mutable$HashTable$$findEntry0(HashTable.scala:139)
at scala.collection.mutable.HashTable$class.findEntry(HashTable.scala:135)
at scala.collection.mutable.HashMap.findEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.get(HashMap.scala:70)
at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExpr(EquivalentExpressions.scala:54)
at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:95)
at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$$anonfun$addExprTree$1.apply(EquivalentExpressions.scala:96)
at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$$anonfun$addExprTree$1.apply(EquivalentExpressions.scala:96)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:96)
at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$subexpressionElimination$1.apply(CodeGenerator.scala:1102)
at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$subexpressionElimination$1.apply(CodeGenerator.scala:1102)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.subexpressionElimination(CodeGenerator.scala:1102)
at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.generateExpressions(CodeGenerator.scala:1154)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.createCode(GenerateUnsafeProjection.scala:270)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:319)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.generate(GenerateUnsafeProjection.scala:308)
at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:181)
at org.apache.spark.sql.execution.ProjectExec$$anonfun$9.apply(basicPhysicalOperators.scala:71)
at org.apache.spark.sql.execution.ProjectExec$$anonfun$9.apply(basicPhysicalOperators.scala:70)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:818)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:818)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:367)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```
This PR takes over https://github.com/apache/spark/pull/20856.
## How was this patch tested?
Manually tested and unit test was added.
Closes#20856
Author: hyukjinkwon <gurwls223@apache.org>
Closes#21815 from HyukjinKwon/SPARK-23731.
## What changes were proposed in this pull request?
Enhances the parser and analyzer to support ANSI compliant syntax for GROUPING SET. As part of this change we derive the grouping expressions from user supplied groupings in the grouping sets clause.
```SQL
SELECT c1, c2, max(c3)
FROM t1
GROUP BY GROUPING SETS ((c1), (c1, c2))
```
## How was this patch tested?
Added tests in SQLQueryTestSuite and ResolveGroupingAnalyticsSuite.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes#21813 from dilipbiswal/spark-24424.
## What changes were proposed in this pull request?
As stated in https://github.com/apache/spark/pull/21321, in the error messages we should use `catalogString`. This is not the case, as SPARK-22893 used `simpleString` in order to have the same representation everywhere and it missed some places.
The PR unifies the messages using alway the `catalogString` representation of the dataTypes in the messages.
## How was this patch tested?
existing/modified UTs
Author: Marco Gaido <marcogaido91@gmail.com>
Closes#21804 from mgaido91/SPARK-24268_catalog.
## What changes were proposed in this pull request?
`RateSourceSuite` may leave garbage files under `sql/core/dummy`, we should use a corrected temp directory
## How was this patch tested?
test only
Author: Wenchen Fan <wenchen@databricks.com>
Closes#21817 from cloud-fan/minor.
## What changes were proposed in this pull request?
Currently, the group state of user-defined-type is encoded as top-level columns in the UnsafeRows stores in the state store. The timeout timestamp is also saved as (when needed) as the last top-level column. Since the group state is serialized to top-level columns, you cannot save "null" as a value of state (setting null in all the top-level columns is not equivalent). So we don't let the user set the timeout without initializing the state for a key. Based on user experience, this leads to confusion.
This PR is to change the row format such that the state is saved as nested columns. This would allow the state to be set to null, and avoid these confusing corner cases. However, queries recovering from existing checkpoint will use the previous format to maintain compatibility with existing production queries.
## How was this patch tested?
Refactored existing end-to-end tests and added new tests for explicitly testing obj-to-row conversion for both state formats.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#21739 from tdas/SPARK-22187-1.
## What changes were proposed in this pull request?
Currently the same Parquet footer is read twice in the function `buildReaderWithPartitionValues` of ParquetFileFormat if filter push down is enabled.
Fix it with simple changes.
## How was this patch tested?
Unit test
Author: Gengliang Wang <gengliang.wang@databricks.com>
Closes#21814 from gengliangwang/parquetFooter.
## What changes were proposed in this pull request?
This patch proposes breaking down configuration of retaining batch size on state into two pieces: files and in memory (cache). While this patch reuses existing configuration for files, it introduces new configuration, "spark.sql.streaming.maxBatchesToRetainInMemory" to configure max count of batch to retain in memory.
## How was this patch tested?
Apply this patch on top of SPARK-24441 (https://github.com/apache/spark/pull/21469), and manually tested in various workloads to ensure overall size of states in memory is around 2x or less of the size of latest version of state, while it was 10x ~ 80x before applying the patch.
Author: Jungtaek Lim <kabhwan@gmail.com>
Closes#21700 from HeartSaVioR/SPARK-24717.
## What changes were proposed in this pull request?
It's a little tricky and fragile to use a dummy filter to switch codegen on/off. For now we should use local/cached relation to switch. In the future when we are able to use a config to turn off codegen, we shall use that.
## How was this patch tested?
test only PR.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#21795 from cloud-fan/follow.
## What changes were proposed in this pull request?
1. Extend the Parser to enable parsing a column list as the pivot column.
2. Extend the Parser and the Pivot node to enable parsing complex expressions with aliases as the pivot value.
3. Add type check and constant check in Analyzer for Pivot node.
## How was this patch tested?
Add tests in pivot.sql
Author: maryannxue <maryannxue@apache.org>
Closes#21720 from maryannxue/spark-24164.
## What changes were proposed in this pull request?
In DatasetSuite.scala, in the 1299 line,
test("SPARK-19896: cannot have circular references in in case class") ,
there are duplicate words "in in". We can get rid of one.
## How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: 韩田田00222924 <han.tiantian@zte.com.cn>
Closes#21767 from httfighter/inin.
## What changes were proposed in this pull request?
This pr fixes lint-java and Scala 2.12 build.
lint-java:
```
[ERROR] src/test/resources/log4j.properties:[0] (misc) NewlineAtEndOfFile: File does not end with a newline.
```
Scala 2.12 build:
```
[error] /.../sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala:121: overloaded method value addTaskCompletionListener with alternatives:
[error] (f: org.apache.spark.TaskContext => Unit)org.apache.spark.TaskContext <and>
[error] (listener: org.apache.spark.util.TaskCompletionListener)org.apache.spark.TaskContext
[error] cannot be applied to (org.apache.spark.TaskContext => java.util.List[Runnable])
[error] context.addTaskCompletionListener { ctx =>
[error] ^
```
## How was this patch tested?
Manually executed lint-java and Scala 2.12 build in my local environment.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#21801 from ueshin/issues/SPARK-24386_24768/fix_build.
## What changes were proposed in this pull request?
This issue aims to upgrade Apache ORC library from 1.4.4 to 1.5.2 in order to bring the following benefits into Apache Spark.
- [ORC-91](https://issues.apache.org/jira/browse/ORC-91) Support for variable length blocks in HDFS (The current space wasted in ORC to padding is known to be 5%.)
- [ORC-344](https://issues.apache.org/jira/browse/ORC-344) Support for using Decimal64ColumnVector
In addition to that, Apache Hive 3.1 and 3.2 will use ORC 1.5.1 ([HIVE-19669](https://issues.apache.org/jira/browse/HIVE-19465)) and 1.5.2 ([HIVE-19792](https://issues.apache.org/jira/browse/HIVE-19792)) respectively. This will improve the compatibility between Apache Spark and Apache Hive by sharing the common library.
## How was this patch tested?
Pass the Jenkins with all existing tests.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#21582 from dongjoon-hyun/SPARK-24576.
## What changes were proposed in this pull request?
Remove the non-negative checks of window start time to make window support negative start time, and add a check to guarantee the absolute value of start time is less than slide duration.
## How was this patch tested?
New unit tests.
Author: HanShuliang <kevinzwx1992@gmail.com>
Closes#18903 from KevinZwx/dev.
## What changes were proposed in this pull request?
We have some functions which need to aware the nullabilities of all children, such as `CreateArray`, `CreateMap`, `Concat`, and so on. Currently we add casts to fix the nullabilities, but the casts might be removed during the optimization phase.
After the discussion, we decided to not add extra casts for just fixing the nullabilities of the nested types, but handle them by functions themselves.
## How was this patch tested?
Modified and added some tests.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#21704 from ueshin/issues/SPARK-24734/concat_containsnull.
## What changes were proposed in this pull request?
Support Decimal type push down to the parquet data sources.
The Decimal comparator used is: [`BINARY_AS_SIGNED_INTEGER_COMPARATOR`](c6764c4a08/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java (L224-L292)).
## How was this patch tested?
unit tests and manual tests.
**manual tests**:
```scala
spark.range(10000000).selectExpr("id", "cast(id as decimal(9)) as d1", "cast(id as decimal(9, 2)) as d2", "cast(id as decimal(18)) as d3", "cast(id as decimal(18, 4)) as d4", "cast(id as decimal(38)) as d5", "cast(id as decimal(38, 18)) as d6").coalesce(1).write.option("parquet.block.size", 1048576).parquet("/tmp/spark/parquet/decimal")
val df = spark.read.parquet("/tmp/spark/parquet/decimal/")
spark.sql("set spark.sql.parquet.filterPushdown.decimal=true")
// Only read about 1 MB data
df.filter("d2 = 10000").show
// Only read about 1 MB data
df.filter("d4 = 10000").show
spark.sql("set spark.sql.parquet.filterPushdown.decimal=false")
// Read 174.3 MB data
df.filter("d2 = 10000").show
// Read 174.3 MB data
df.filter("d4 = 10000").show
```
Author: Yuming Wang <yumwang@ebay.com>
Closes#21556 from wangyum/SPARK-24549.
## What changes were proposed in this pull request?
In the PR, I propose to move `testFile()` to the common trait `SQLTestUtilsBase` and wrap test files in `AvroSuite` by the method `testFile()` which returns full paths to test files in the resource folder.
Author: Maxim Gekk <maxim.gekk@databricks.com>
Closes#21773 from MaxGekk/test-file.
## What changes were proposed in this pull request?
This pr modified code to project required data from CSV parsed data when column pruning disabled.
In the current master, an exception below happens if `spark.sql.csv.parser.columnPruning.enabled` is false. This is because required formats and CSV parsed formats are different from each other;
```
./bin/spark-shell --conf spark.sql.csv.parser.columnPruning.enabled=false
scala> val dir = "/tmp/spark-csv/csv"
scala> spark.range(10).selectExpr("id % 2 AS p", "id").write.mode("overwrite").partitionBy("p").csv(dir)
scala> spark.read.csv(dir).selectExpr("sum(p)").collect()
18/06/25 13:48:46 ERROR Executor: Exception in task 2.0 in stage 2.0 (TID 7)
java.lang.ClassCastException: org.apache.spark.unsafe.types.UTF8String cannot be cast to java.lang.Integer
at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101)
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getInt(rows.scala:41)
...
```
## How was this patch tested?
Added tests in `CSVSuite`.
Author: Takeshi Yamamuro <yamamuro@apache.org>
Closes#21657 from maropu/SPARK-24676.
## What changes were proposed in this pull request?
`Timestamp` support pushdown to parquet data source.
Only `TIMESTAMP_MICROS` and `TIMESTAMP_MILLIS` support push down.
## How was this patch tested?
unit tests and benchmark tests
Author: Yuming Wang <yumwang@ebay.com>
Closes#21741 from wangyum/SPARK-24718.
## What changes were proposed in this pull request?
The original pr is: https://github.com/apache/spark/pull/18424
Add a new optimizer rule to convert an IN predicate to an equivalent Parquet filter and add `spark.sql.parquet.pushdown.inFilterThreshold` to control limit thresholds. Different data types have different limit thresholds, this is a copy of data for reference:
Type | limit threshold
-- | --
string | 370
int | 210
long | 285
double | 270
float | 220
decimal | Won't provide better performance before [SPARK-24549](https://issues.apache.org/jira/browse/SPARK-24549)
## How was this patch tested?
unit tests and manual tests
Author: Yuming Wang <yumwang@ebay.com>
Closes#21603 from wangyum/SPARK-17091.
## What changes were proposed in this pull request?
When we use a reference from Dataset in filter or sort, which was not used in the prior select, an AnalysisException occurs, e.g.,
```scala
val df = Seq(("test1", 0), ("test2", 1)).toDF("name", "id")
df.select(df("name")).filter(df("id") === 0).show()
```
```scala
org.apache.spark.sql.AnalysisException: Resolved attribute(s) id#6 missing from name#5 in operator !Filter (id#6 = 0).;;
!Filter (id#6 = 0)
+- AnalysisBarrier
+- Project [name#5]
+- Project [_1#2 AS name#5, _2#3 AS id#6]
+- LocalRelation [_1#2, _2#3]
```
This change updates the rule `ResolveMissingReferences` so `Filter` and `Sort` with non-empty `missingInputs` will also be transformed.
## How was this patch tested?
Added tests.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#21745 from viirya/SPARK-24781.