### What changes were proposed in this pull request?
This PR aims to add `sinkParameter` to check sink options robustly and independently in DataStreamReaderWriterSuite
### Why are the changes needed?
`LastOptions.parameters` is designed to catch three cases: `sourceSchema`, `createSource`, `createSink`. However, `StreamQuery.stop` invokes `queryExecutionThread.join`, `runStream`, `createSource` immediately and reset the stored options by `createSink`.
To catch `createSink` options, currently, the test suite is trying a workaround pattern. However, we observed a flakiness in this pattern sometimes. If we split `createSink` option separately, we don't need this workaround and can eliminate this flakiness.
```scala
val query = df.writeStream.
...
.start()
assert(LastOptions.paramters(..))
query.stop()
```
### Does this PR introduce _any_ user-facing change?
No. This is a test-only change.
### How was this patch tested?
Pass the newly updated test case.
Closes#29730 from dongjoon-hyun/SPARK-32845.
Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
This is a follow-up to https://github.com/apache/spark/pull/29572.
LeftAnti SortMergeJoin should not buffer all matching right side rows when bound condition is empty, this is unnecessary and can lead to performance degradation especially when spilling happens.
### Why are the changes needed?
Performance improvement.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New UT.
Closes#29727 from peter-toth/SPARK-32730-improve-leftsemi-sortmergejoin-followup.
Authored-by: Peter Toth <peter.toth@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This is a followup of https://github.com/apache/spark/pull/29328
In https://github.com/apache/spark/pull/29328 , we forbid the use case that path option and path parameter are both specified. However, it breaks some use cases:
```
val dfr = spark.read.format(...).option(...)
dfr.load(path1).xxx
dfr.load(path2).xxx
```
The reason is that: `load` has side effects. It will set path option to the `DataFrameReader` instance. The next time you call `load`, Spark will fail because both path option and path parameter are specified.
This PR removes the side effect of `save`/`load`/`start` to not set the path option.
### Why are the changes needed?
recover some use cases
### Does this PR introduce _any_ user-facing change?
Yes, some use cases fail before this PR, and can run successfully after this PR.
### How was this patch tested?
new tests
Closes#29723 from cloud-fan/df.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
fix typo in SQLConf
### Why are the changes needed?
typo fix to increase readability
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
no test
Closes#29668 from Ted-Jiang/fix_annotate.
Authored-by: yangjiang <yangjiang@ebay.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
### What changes were proposed in this pull request?
We made a mistake in https://github.com/apache/spark/pull/29502, as there is no code comment to explain why we can't load the UDF class when creating functions. This PR improves the code comment.
### Why are the changes needed?
To avoid making the same mistake.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
N/A
Closes#29713 from cloud-fan/comment.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
### What changes were proposed in this pull request?
This PR changes the behavior of RangeExec with WholeStageCodegen disabled or falled back to change the number of partitions to zero when a range is empty.
In the current master, if WholeStageCodegen effects, the number of partitions of an empty range will be changed to zero.
```
spark.range(1, 1, 1, 1000).rdd.getNumPartitions
res0: Int = 0
```
But it doesn't if WholeStageCodegen is disabled or falled back.
```
spark.conf.set("spark.sql.codegen.wholeStage", false)
spark.range(1, 1, 1, 1000).rdd.getNumPartitions
res2: Int = 1000
```
### Why are the changes needed?
To archive better performance even though WholeStageCodegen disabled or falled back.
### Does this PR introduce _any_ user-facing change?
Yes. the number of partitions gotten with `getNumPartitions` for an empty range will be changed when WholeStageCodegen is disabled.
### How was this patch tested?
New test.
Closes#29681 from sarutak/zero-size-range.
Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
### What changes were proposed in this pull request?
Spark SQL exists a bug show below:
```
spark.sql(
" SELECT COUNT(DISTINCT 2), COUNT(DISTINCT 2, 3)")
.show()
+-----------------+--------------------+
|count(DISTINCT 2)|count(DISTINCT 2, 3)|
+-----------------+--------------------+
| 1| 1|
+-----------------+--------------------+
spark.sql(
" SELECT COUNT(DISTINCT 2), COUNT(DISTINCT 3, 2)")
.show()
+-----------------+--------------------+
|count(DISTINCT 2)|count(DISTINCT 3, 2)|
+-----------------+--------------------+
| 1| 0|
+-----------------+--------------------+
```
The first query is correct, but the second query is not.
The root reason is the second query rewrited by `RewriteDistinctAggregates` who expand the output but lost the 2.
### Why are the changes needed?
Fix a bug.
`SELECT COUNT(DISTINCT 2), COUNT(DISTINCT 3, 2)` should return `1, 1`
### Does this PR introduce _any_ user-facing change?
Yes
### How was this patch tested?
New UT
Closes#29626 from beliefer/support-multiple-foldable-distinct-expressions.
Lead-authored-by: gengjiaan <gengjiaan@360.cn>
Co-authored-by: beliefer <beliefer@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
In this PR, we add a checker for STRING form interval value ahead for parsing multiple units intervals and fail directly if the interval value contains alphabets to prevent correctness issues like `interval '1 day 2' day`=`3 days`.
### Why are the changes needed?
fix correctness issue
### Does this PR introduce _any_ user-facing change?
yes, in spark 3.0.0 `interval '1 day 2' day`=`3 days` but now we fail with ParseException
### How was this patch tested?
add a test.
Closes#29708 from yaooqinn/SPARK-32840.
Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This PR intends to fix an existing bug below in `UserDefinedTypeSuite`;
```
[info] - SPARK-19311: UDFs disregard UDT type hierarchy (931 milliseconds)
16:22:35.936 WARN org.apache.spark.sql.catalyst.expressions.SafeProjection: Expr codegen error and falling back to interpreter mode
org.apache.spark.SparkException: Cannot cast org.apache.spark.sql.ExampleSubTypeUDT46b1771f to org.apache.spark.sql.ExampleBaseTypeUDT31e8d979.
at org.apache.spark.sql.catalyst.expressions.CastBase.nullSafeCastFunction(Cast.scala:891)
at org.apache.spark.sql.catalyst.expressions.CastBase.doGenCode(Cast.scala:852)
at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:147)
...
```
### Why are the changes needed?
bugfix
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added unit tests.
Closes#29691 from maropu/FixUdtBug.
Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
### What changes were proposed in this pull request?
This patch proposes to update the doc (both SS guide doc and Dataset dropDuplicates method doc) to leave a note to check on using SQL statements with streaming Dataset.
Once end users create a temp view based on streaming Dataset, they won't bother with thinking about "streaming" and do whatever they do with batch query. In many cases it works, but not just smoothly for the case when streaming aggregation is involved. They still need to concern about maintaining state store.
### Why are the changes needed?
Although SPARK-32456 fixed the weird error message, as a side effect some operations are enabled on streaming workload via SQL statement, which is error-prone if end users don't indicate what they're doing.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Only doc change.
Closes#29461 from HeartSaVioR/SPARK-32456-FOLLOWUP-DOC.
Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This PR aims to fix indeterministic behavior on DataStreamReader/Writer options like the following.
```scala
scala> spark.readStream.format("parquet").option("paTh", "1").option("PATH", "2").option("Path", "3").option("patH", "4").option("path", "5").load()
org.apache.spark.sql.AnalysisException: Path does not exist: 1;
```
### Why are the changes needed?
This will make the behavior deterministic.
### Does this PR introduce _any_ user-facing change?
Yes, but the previous behavior is indeterministic.
### How was this patch tested?
Pass the newly test cases.
Closes#29702 from dongjoon-hyun/SPARK-32832.
Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
This PR renames `SupportsStreamingUpdate` to `SupportsStreamingUpdateAsAppend` as the new interface name represents the actual behavior clearer. This PR also removes the `update()` method (so the interface is more likely a marker), as the implementations of `SupportsStreamingUpdateAsAppend` should support append mode by default, hence no need to trigger some flag on it.
### Why are the changes needed?
SupportsStreamingUpdate was intended to revive the functionality of Streaming update output mode for internal data sources, but despite the name, that interface isn't really used to do actual update on sink; all sinks are implementing this interface to do append, so strictly saying, it's just to support update as append. Renaming the interface would make it clear.
### Does this PR introduce _any_ user-facing change?
No, as the class is only for internal data sources.
### How was this patch tested?
Jenkins test will follow.
Closes#29693 from HeartSaVioR/SPARK-32831.
Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
### What changes were proposed in this pull request?
Upgrade Apache Arrow to version 1.0.1 for the Java dependency and increase minimum version of PyArrow to 1.0.0.
This release marks a transition to binary stability of the columnar format (which was already informally backward-compatible going back to December 2017) and a transition to Semantic Versioning for the Arrow software libraries. Also note that the Java arrow-memory artifact has been split to separate dependence on netty-buffer and allow users to select an allocator. Spark will continue to use `arrow-memory-netty` to maintain performance benefits.
Version 1.0.0 - 1.0.0 include the following selected fixes/improvements relevant to Spark users:
ARROW-9300 - [Java] Separate Netty Memory to its own module
ARROW-9272 - [C++][Python] Reduce complexity in python to arrow conversion
ARROW-9016 - [Java] Remove direct references to Netty/Unsafe Allocators
ARROW-8664 - [Java] Add skip null check to all Vector types
ARROW-8485 - [Integration][Java] Implement extension types integration
ARROW-8434 - [C++] Ipc RecordBatchFileReader deserializes the Schema multiple times
ARROW-8314 - [Python] Provide a method to select a subset of columns of a Table
ARROW-8230 - [Java] Move Netty memory manager into a separate module
ARROW-8229 - [Java] Move ArrowBuf into the Arrow package
ARROW-7955 - [Java] Support large buffer for file/stream IPC
ARROW-7831 - [Java] unnecessary buffer allocation when calling splitAndTransferTo on variable width vectors
ARROW-6111 - [Java] Support LargeVarChar and LargeBinary types and add integration test with C++
ARROW-6110 - [Java] Support LargeList Type and add integration test with C++
ARROW-5760 - [C++] Optimize Take implementation
ARROW-300 - [Format] Add body buffer compression option to IPC message protocol using LZ4 or ZSTD
ARROW-9098 - RecordBatch::ToStructArray cannot handle record batches with 0 column
ARROW-9066 - [Python] Raise correct error in isnull()
ARROW-9223 - [Python] Fix to_pandas() export for timestamps within structs
ARROW-9195 - [Java] Wrong usage of Unsafe.get from bytearray in ByteFunctionsHelper class
ARROW-7610 - [Java] Finish support for 64 bit int allocations
ARROW-8115 - [Python] Conversion when mixing NaT and datetime objects not working
ARROW-8392 - [Java] Fix overflow related corner cases for vector value comparison
ARROW-8537 - [C++] Performance regression from ARROW-8523
ARROW-8803 - [Java] Row count should be set before loading buffers in VectorLoader
ARROW-8911 - [C++] Slicing a ChunkedArray with zero chunks segfaults
View release notes here:
https://arrow.apache.org/release/1.0.1.htmlhttps://arrow.apache.org/release/1.0.0.html
### Why are the changes needed?
Upgrade brings fixes, improvements and stability guarantees.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing tests with pyarrow 1.0.0 and 1.0.1
Closes#29686 from BryanCutler/arrow-upgrade-100-SPARK-32312.
Authored-by: Bryan Cutler <cutlerb@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
In Spark 3.0.0, the SparkGetColumnsOperation can not recognize NULL columns but now we can because the side effect of https://issues.apache.org/jira/browse/SPARK-32696 / f14f3742e0, but the test coverage for this change was not added.
In Spark, the column size for null fields should be 1, in this PR, we set the right column size for the null type.
### Why are the changes needed?
test coverage and fix the client-side information about the null type through jdbc
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
added ut both for this pr and SPARK-32696
Closes#29687 from yaooqinn/SPARK-32826.
Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This patch proposes to check `ignoreNullability` parameter recursively in `equalsStructurally` method.
### Why are the changes needed?
`equalsStructurally` is used to check type equality. We can optionally ask to ignore nullability check. But the parameter `ignoreNullability` is not passed recursively down to nested types. So it produces weird error like:
```
data type mismatch: argument 3 requires array<array<string>> type, however ... is of array<array<string>> type.
```
when running the query `select aggregate(split('abcdefgh',''), array(array('')), (acc, x) -> array(array( x ) ) )`.
### Does this PR introduce _any_ user-facing change?
Yes, fixed a bug when running user query.
### How was this patch tested?
Unit tests.
Closes#29698 from viirya/SPARK-32819.
Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This PR aims to fix the test coverage at `DataStreamReaderWriterSuite`.
### Why are the changes needed?
Currently, the test case checks `DataStreamReader` options instead of `DataStreamWriter` options.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass the revised test case.
Closes#29701 from dongjoon-hyun/SPARK-32836.
Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
This PR is a follow up to https://github.com/apache/spark/pull/29543#discussion_r485409606, which correctly points out that the check for the empty string is not necessary.
### Why are the changes needed?
The unnecessary check actually could cause more confusion.
For example,
```scala
scala> Seq(1).toDF.write.option("path", "/tmp/path1").parquet("")
java.lang.IllegalArgumentException: Can not create a Path from an empty string
at org.apache.hadoop.fs.Path.checkPathArg(Path.java:168)
```
even when `path` option is available. This PR addresses to fix this confusion.
### Does this PR introduce _any_ user-facing change?
Yes, now the above example prints the consistent exception message whether the path parameter value is empty or not.
```scala
scala> Seq(1).toDF.write.option("path", "/tmp/path1").parquet("")
org.apache.spark.sql.AnalysisException: There is a 'path' option set and save() is called with a path parameter. Either remove the path option, or call save() without the parameter. To ignore this check, set 'spark.sql.legacy.pathOptionBehavior.enabled' to 'true'.;
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:290)
at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:856)
... 47 elided
```
### How was this patch tested?
Added unit tests.
Closes#29697 from imback82/SPARK-32516-followup.
Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Make MicroBatchExecution explicitly call `getBatch` when the start and end offsets are the same.
### Why are the changes needed?
Structured Streaming micro-batch engine has the contract with V1 data sources that, after a restart, it will call `source.getBatch()` on the last batch attempted before the restart. However, a very rare combination of sequences violates this contract. It occurs only when
- The streaming query has specific types of stateful operations with watermarks (e.g., aggregation in append, mapGroupsWithState with timeouts).
- These queries can execute a batch even without new data when the previous updates the watermark and the stateful ops are such that the new watermark can cause new output/cleanup. Such batches are called no-data-batches.
- The last batch before termination was an incomplete no-data-batch. Upon restart, the micro-batch engine fails to call `source.getBatch` when attempting to re-execute the incomplete no-data-batch.
This occurs because no-data-batches has the same and end offsets, and when a batch is executed, if the start and end offset is same then calling `source.getBatch` is skipped as it is assumed the generated plan will be empty. This only affects V1 data sources like Delta and Autoloader which rely on this invariant to detect in the source whether the query is being started from scratch or restarted.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
New unit test with a mock v1 source that fails without the fix.
Closes#29651 from tdas/SPARK-32794.
Authored-by: Tathagata Das <tathagata.das1565@gmail.com>
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
### What changes were proposed in this pull request?
`--` method of `AttributeSet` behave differently under Scala 2.12 and 2.13 because `--` method of `LinkedHashSet` in Scala 2.13 can't maintains the insertion order.
This pr use a Scala 2.12 based code to ensure `--` method of AttributeSet have same behavior under Scala 2.12 and 2.13.
### Why are the changes needed?
The behavior of `AttributeSet` needs to be compatible with Scala 2.12 and 2.13
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Scala 2.12: Pass the Jenkins or GitHub Action
Scala 2.13: Manual test sub-suites of `PlanStabilitySuite`
- **Before** :293 TESTS FAILED
- **After**:13 TESTS FAILED(The remaining failures are not associated with the current issue)
Closes#29689 from LuciferYang/SPARK-32755-FOLLOWUP.
Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
The purpose of this pr is to partial resolve [SPARK-32808](https://issues.apache.org/jira/browse/SPARK-32808), total of 26 failed test cases were fixed, the related suite as follow:
- `StreamingAggregationSuite` related test cases (2 FAILED -> Pass)
- `GeneratorFunctionSuite` related test cases (2 FAILED -> Pass)
- `UDFSuite` related test cases (2 FAILED -> Pass)
- `SQLQueryTestSuite` related test cases (5 FAILED -> Pass)
- `WholeStageCodegenSuite` related test cases (1 FAILED -> Pass)
- `DataFrameSuite` related test cases (3 FAILED -> Pass)
- `OrcV1QuerySuite\OrcV2QuerySuite` related test cases (4 FAILED -> Pass)
- `ExpressionsSchemaSuite` related test cases (1 FAILED -> Pass)
- `DataFrameStatSuite` related test cases (1 FAILED -> Pass)
- `JsonV1Suite\JsonV2Suite\JsonLegacyTimeParserSuite` related test cases (6 FAILED -> Pass)
The main change of this pr as following:
- Fix Scala 2.13 compilation problems in `ShuffleBlockFetcherIterator` and `Analyzer`
- Specified `Seq` to `scala.collection.Seq` in `objects.scala` and `GenericArrayData` because internal use `Seq` maybe `mutable.ArraySeq` and not easy to call `.toSeq`
- Should specified `Seq` to `scala.collection.Seq` when we call `Row.getAs[Seq]` and `Row.get(i).asInstanceOf[Seq]` because the data maybe `mutable.ArraySeq` but `Seq` is `immutable.Seq` in Scala 2.13
- Use a compatible way to let `+` and `-` method of `Decimal` having the same behavior in Scala 2.12 and Scala 2.13
- Call `toList` in `RelationalGroupedDataset.toDF` method when `groupingExprs` is `Stream` type because `Stream` can't serialize in Scala 2.13
- Add a manual sort to `classFunsMap` in `ExpressionsSchemaSuite` because `Iterable.groupBy` in Scala 2.13 has different result with `TraversableLike.groupBy` in Scala 2.12
### Why are the changes needed?
We need to support a Scala 2.13 build.
### Does this PR introduce _any_ user-facing change?
Should specified `Seq` to `scala.collection.Seq` when we call `Row.getAs[Seq]` and `Row.get(i).asInstanceOf[Seq]` because the data maybe `mutable.ArraySeq` but the `Seq` is `immutable.Seq` in Scala 2.13
### How was this patch tested?
- Scala 2.12: Pass the Jenkins or GitHub Action
- Scala 2.13: Do the following:
```
dev/change-scala-version.sh 2.13
mvn clean install -DskipTests -pl sql/core -Pscala-2.13 -am
mvn test -pl sql/core -Pscala-2.13
```
**Before**
```
Tests: succeeded 8166, failed 319, canceled 1, ignored 52, pending 0
*** 319 TESTS FAILED ***
```
**After**
```
Tests: succeeded 8204, failed 286, canceled 1, ignored 52, pending 0
*** 286 TESTS FAILED ***
```
Closes#29660 from LuciferYang/SPARK-32808.
Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
### What changes were proposed in this pull request?
If no active SparkSession is available, let `FileSourceScanExec.needsUnsafeRowConversion` look at default SQL config of ParquetSource vectorized reader instead of failing the query execution.
### Why are the changes needed?
Fix a bug that if no active SparkSession is available, file-based data source scan for Parquet Source will throw exception.
### Does this PR introduce _any_ user-facing change?
Yes, this change fixes the bug.
### How was this patch tested?
Unit test.
Closes#29667 from viirya/SPARK-32813.
Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
In the PR, I propose to move the test `SPARK-32810: CSV and JSON data sources should be able to read files with escaped glob metacharacter in the paths` from `DataFrameReaderWriterSuite` to `CSVSuite` and to `JsonSuite`. This will allow to run the same test in `CSVv1Suite`/`CSVv2Suite` and in `JsonV1Suite`/`JsonV2Suite`.
### Why are the changes needed?
To improve test coverage by checking JSON/CSV datasources v1 and v2.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
By running affected test suites:
```
$ build/sbt "sql/test:testOnly org.apache.spark.sql.execution.datasources.csv.*"
$ build/sbt "sql/test:testOnly org.apache.spark.sql.execution.datasources.json.*"
```
Closes#29684 from MaxGekk/globbing-paths-when-inferring-schema-dsv2.
Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Fix indentation and clean up view in the test added by https://github.com/apache/spark/pull/29593.
### Why are the changes needed?
Address review comments in https://github.com/apache/spark/pull/29665.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Updated test.
Closes#29682 from manuzhang/spark-32753-followup.
Authored-by: manuzhang <owenzhang1990@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
In `SubqueryBroadcastExec.relationFuture`, if the `broadcastRelation` is an `EmptyHashedRelation`, then `broadcastRelation.keys()` will throw `UnsupportedOperationException`.
### Why are the changes needed?
To fix a bug.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added a new test.
Closes#29671 from wzhfy/dpp_empty_broadcast.
Authored-by: Zhenhua Wang <wzh_zju@163.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
### What changes were proposed in this pull request?
Before SPARK-31511 is fixed, `BytesToBytesMap` iterator() is not thread-safe and may cause data inaccuracy.
We need to add a unit test.
### Why are the changes needed?
Increase test coverage to ensure that iterator() is thread-safe.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
add ut
Closes#29669 from cxzl25/SPARK-31511-test.
Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This reverts commit 04f7f6dac0 due to the discussion in [comment](https://github.com/apache/spark/pull/29589#discussion_r484657207).
### Why are the changes needed?
Based on the discussion in [comment](https://github.com/apache/spark/pull/29589#discussion_r484657207), propagation for thread local properties in `SubqueryBroadcastExec` is not necessary, since they will be propagated by broadcast exchange threads anyway.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Also revert the added test.
Closes#29674 from wzhfy/revert_dpp_thread_local.
Authored-by: Zhenhua Wang <wzh_zju@163.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
### What changes were proposed in this pull request?
This is a Spark 3.0 regression introduced by https://github.com/apache/spark/pull/26761. We missed a corner case that `java.lang.Double.compare` treats 0.0 and -0.0 as different, which breaks SQL semantic.
This PR adds back the `OrderingUtil`, to provide custom compare methods that take care of 0.0 vs -0.0
### Why are the changes needed?
Fix a correctness bug.
### Does this PR introduce _any_ user-facing change?
Yes, now `SELECT 0.0 > -0.0` returns false correctly as Spark 2.x.
### How was this patch tested?
new tests
Closes#29647 from cloud-fan/float.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
This is a followup of https://github.com/apache/spark/pull/29485
It moves the plan rewriting methods from `Analyzer` to `QueryPlan`, so that it can work with `SparkPlan` as well. This PR also does an improvement to support a corner case (The attribute to be replace stays together with an unresolved attribute), and make it more general, so that `WidenSetOperationTypes` can rewrite the plan in one shot like before.
### Why are the changes needed?
Code cleanup and generalize.
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
existing test
Closes#29643 from cloud-fan/cleanup.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
### What changes were proposed in this pull request?
In the PR, I propose to fix an issue with the CSV and JSON data sources in Spark SQL when both of the following are true:
* no user specified schema
* some file paths contain escaped glob metacharacters, such as `[``]`, `{``}`, `*` etc.
### Why are the changes needed?
To fix the issue when the follow two queries try to read from paths `[abc].csv` and `[abc].json`:
```scala
spark.read.csv("""/tmp/\[abc\].csv""").show
spark.read.json("""/tmp/\[abc\].json""").show
```
but would end up hitting an exception:
```
org.apache.spark.sql.AnalysisException: Path does not exist: file:/tmp/[abc].csv;
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$1(DataSource.scala:722)
at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:392)
```
### Does this PR introduce _any_ user-facing change?
Yes
### How was this patch tested?
Added new test cases in `DataFrameReaderWriterSuite`.
Closes#29659 from MaxGekk/globbing-paths-when-inferring-schema.
Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Only copy tags to node with no tags when transforming plans.
### Why are the changes needed?
cloud-fan [made a good point](https://github.com/apache/spark/pull/29593#discussion_r482013121) that it doesn't make sense to append tags to existing nodes when nodes are removed. That will cause such bugs as duplicate rows when deduplicating and repartitioning by the same column with AQE.
```
spark.range(10).union(spark.range(10)).createOrReplaceTempView("v1")
val df = spark.sql("select id from v1 group by id distribute by id")
println(df.collect().toArray.mkString(","))
println(df.queryExecution.executedPlan)
// With AQE
[4],[0],[3],[2],[1],[7],[6],[8],[5],[9],[4],[0],[3],[2],[1],[7],[6],[8],[5],[9]
AdaptiveSparkPlan(isFinalPlan=true)
+- CustomShuffleReader local
+- ShuffleQueryStage 0
+- Exchange hashpartitioning(id#183L, 10), true
+- *(3) HashAggregate(keys=[id#183L], functions=[], output=[id#183L])
+- Union
:- *(1) Range (0, 10, step=1, splits=2)
+- *(2) Range (0, 10, step=1, splits=2)
// Without AQE
[4],[7],[0],[6],[8],[3],[2],[5],[1],[9]
*(4) HashAggregate(keys=[id#206L], functions=[], output=[id#206L])
+- Exchange hashpartitioning(id#206L, 10), true
+- *(3) HashAggregate(keys=[id#206L], functions=[], output=[id#206L])
+- Union
:- *(1) Range (0, 10, step=1, splits=2)
+- *(2) Range (0, 10, step=1, splits=2)
```
It's too expensive to detect node removal so we make a compromise only to copy tags to node with no tags.
### Does this PR introduce _any_ user-facing change?
Yes. Fix a bug.
### How was this patch tested?
Add test.
Closes#29593 from manuzhang/spark-32753.
Authored-by: manuzhang <owenzhang1990@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Since [SPARK-22590](2854091d12), local property propagation is supported through `SQLExecution.withThreadLocalCaptured` in both `BroadcastExchangeExec` and `SubqueryExec` when computing `relationFuture`. This pr adds the support in `SubqueryBroadcastExec`.
### Why are the changes needed?
Local property propagation is missed in `SubqueryBroadcastExec`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Add a new test.
Closes#29589 from wzhfy/thread_local.
Authored-by: Zhenhua Wang <wzh_zju@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
No need of using database name in `loadPartition` API of `Shim_v3_0` to get the hive table, in hive there is a overloaded method which gives hive table using table name. By using this API dependency with `SessionCatalog` can be removed in Shim layer
### Why are the changes needed?
To avoid deadlock when communicating with Hive metastore 3.1.x
```
Found one Java-level deadlock:
=============================
"worker3":
waiting to lock monitor 0x00007faf0be602b8 (object 0x00000007858f85f0, a org.apache.spark.sql.hive.HiveSessionCatalog),
which is held by "worker0"
"worker0":
waiting to lock monitor 0x00007faf0be5fc88 (object 0x0000000785c15c80, a org.apache.spark.sql.hive.HiveExternalCatalog),
which is held by "worker3"
Java stack information for the threads listed above:
===================================================
"worker3":
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.getCurrentDatabase(SessionCatalog.scala:256)
- waiting to lock <0x00000007858f85f0> (a org.apache.spark.sql.hive.HiveSessionCatalog)
at org.apache.spark.sql.hive.client.Shim_v3_0.loadPartition(HiveShim.scala:1332)
at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$loadPartition$1(HiveClientImpl.scala:870)
at org.apache.spark.sql.hive.client.HiveClientImpl$$Lambda$4459/1387095575.apply$mcV$sp(Unknown Source)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:294)
at org.apache.spark.sql.hive.client.HiveClientImpl$$Lambda$2227/313239499.apply(Unknown Source)
at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:227)
at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:226)
- locked <0x0000000785ef9d78> (a org.apache.spark.sql.hive.client.IsolatedClientLoader)
at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:276)
at org.apache.spark.sql.hive.client.HiveClientImpl.loadPartition(HiveClientImpl.scala:860)
at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$loadPartition$1(HiveExternalCatalog.scala:911)
at org.apache.spark.sql.hive.HiveExternalCatalog$$Lambda$4457/2037578495.apply$mcV$sp(Unknown Source)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:99)
- locked <0x0000000785c15c80> (a org.apache.spark.sql.hive.HiveExternalCatalog)
at org.apache.spark.sql.hive.HiveExternalCatalog.loadPartition(HiveExternalCatalog.scala:890)
at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.loadPartition(ExternalCatalogWithListener.scala:179)
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.loadPartition(SessionCatalog.scala:512)
at org.apache.spark.sql.execution.command.LoadDataCommand.run(tables.scala:383)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
- locked <0x00000007b1690ff8> (a org.apache.spark.sql.execution.command.ExecutedCommandExec)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
at org.apache.spark.sql.Dataset$$Lambda$2084/428667685.apply(Unknown Source)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616)
at org.apache.spark.sql.Dataset$$Lambda$2085/559530590.apply(Unknown Source)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$$$Lambda$2093/139449177.apply(Unknown Source)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at org.apache.spark.sql.execution.SQLExecution$$$Lambda$2086/1088974677.apply(Unknown Source)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
at org.apache.spark.sql.Dataset$$$Lambda$1959/1977822284.apply(Unknown Source)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:606)
at org.apache.spark.sql.SparkSession$$Lambda$1899/424830920.apply(Unknown Source)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:601)
at $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anon$1.run(<console>:45)
at java.lang.Thread.run(Thread.java:748)
"worker0":
at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:99)
- waiting to lock <0x0000000785c15c80
> (a org.apache.spark.sql.hive.HiveExternalCatalog)
at org.apache.spark.sql.hive.HiveExternalCatalog.tableExists(HiveExternalCatalog.scala:851)
at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.tableExists(ExternalCatalogWithListener.scala:146)
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.tableExists(SessionCatalog.scala:432)
- locked <0x00000007858f85f0> (a org.apache.spark.sql.hive.HiveSessionCatalog)
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.requireTableExists(SessionCatalog.scala:185)
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.loadPartition(SessionCatalog.scala:509)
at org.apache.spark.sql.execution.command.LoadDataCommand.run(tables.scala:383)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
- locked <0x00000007b529af58> (a org.apache.spark.sql.execution.command.ExecutedCommandExec)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
at org.apache.spark.sql.Dataset$$Lambda$2084/428667685.apply(Unknown Source)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616)
at org.apache.spark.sql.Dataset$$Lambda$2085/559530590.apply(Unknown Source)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$$$Lambda$2093/139449177.apply(Unknown Source)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at org.apache.spark.sql.execution.SQLExecution$$$Lambda$2086/1088974677.apply(Unknown Source)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
at org.apache.spark.sql.Dataset$$$Lambda$1959/1977822284.apply(Unknown Source)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:606)
at org.apache.spark.sql.SparkSession$$Lambda$1899/424830920.apply(Unknown Source)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:601)
at $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anon$1.run(<console>:45)
at java.lang.Thread.run(Thread.java:748)
Found 1 deadlock.
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Tested using below script by executing in spark-shell and I found no dead lock
launch spark-shell using ./bin/spark-shell --conf "spark.sql.hive.metastore.jars=maven" --conf spark.sql.hive.metastore.version=3.1 --conf spark.hadoop.datanucleus.schema.autoCreateAll=true
**code**
```
def testHiveDeadLock = {
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
println("test hive DeadLock")
spark.sql("drop database if exists testDeadLock cascade")
spark.sql("create database testDeadLock")
spark.sql("use testDeadLock")
val tableCount = 100
val tableNamePrefix = "testdeadlock"
for (i <- 0 until tableCount) {
val tableName = s"$tableNamePrefix${i + 1}"
spark.sql(s"drop table if exists $tableName")
spark.sql(s"create table $tableName (a bigint) partitioned by (b bigint) stored as orc")
}
val threads = new ArrayBuffer[Thread]
for (i <- 0 until tableCount) {
threads.append(new Thread( new Runnable {
override def run: Unit = {
val tableName = s"$tableNamePrefix${i + 1}"
val rand = Random
val df = spark.range(0, 20000).toDF("a")
val location = s"/tmp/${rand.nextLong.abs}"
df.write.mode("overwrite").orc(location)
spark.sql(
s"""
LOAD DATA LOCAL INPATH '$location' INTO TABLE $tableName partition (b=$i)""")
}
}, s"worker$i"))
threads(i).start()
}
for (i <- 0 until tableCount) {
println(s"Joining with thread $i")
threads(i).join()
}
for (i <- 0 until tableCount) {
val tableName = s"$tableNamePrefix${i + 1}"
spark.sql(s"select count(*) from $tableName").show(false)
}
println("All done")
}
for(i <- 0 until 100) {
testHiveDeadLock
println(s"completed {$i}th iteration")
}
}
```
Closes#29649 from sandeep-katta/metastore3.1DeadLock.
Authored-by: sandeep.katta <sandeep.katta2007@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Change `CreateFunctionCommand` code that add class check before create function.
### Why are the changes needed?
We have different behavior between create permanent function and temporary function when function class is invaild. e.g.,
```
create function f as 'test.non.exists.udf';
-- Time taken: 0.104 seconds
create temporary function f as 'test.non.exists.udf'
-- Error in query: Can not load class 'test.non.exists.udf' when registering the function 'f', please make sure it is on the classpath;
```
And Hive also fails both of them.
### Does this PR introduce _any_ user-facing change?
Yes, user will get exception when create a invalid udf.
### How was this patch tested?
New test.
Closes#29502 from ulysses-you/function.
Authored-by: ulysses <youxiduo@weidian.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
bugfix for incomplete interval values, e.g. interval '1', interval '1 day 2', currently these cases will result null, but actually we should fail them with IllegalArgumentsException
### Why are the changes needed?
correctness
### Does this PR introduce _any_ user-facing change?
yes, incomplete intervals will throw exception now
#### before
```
bin/spark-sql -S -e "select interval '1', interval '+', interval '1 day -'"
NULL NULL NULL
```
#### after
```
-- !query
select interval '1'
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.catalyst.parser.ParseException
Cannot parse the INTERVAL value: 1(line 1, pos 7)
== SQL ==
select interval '1'
```
### How was this patch tested?
unit tests added
Closes#29635 from yaooqinn/SPARK-32785.
Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Currently, Spark Public Rest APIs support Application attemptId except SQL API. This causes `no such app: application_X` issue when the application has `attemptId` (e.g: YARN cluster mode).
Please find existing and supported Rest endpoints with attemptId.
```
// Existing Rest Endpoints
applications/{appId}/sql
applications/{appId}/sql/{executionId}
// Rest Endpoints required support
applications/{appId}/{attemptId}/sql
applications/{appId}/{attemptId}/sql/{executionId}
```
Also fixing following compile warning on `SqlResourceSuite`:
```
[WARNING] [Warn] ~/spark/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala:67: Reference to uninitialized value edges
```
### Why are the changes needed?
This causes `no such app: application_X` issue when the application has `attemptId`.
### Does this PR introduce _any_ user-facing change?
Not yet because SQL Rest API is being planned to release with `Spark 3.1`.
### How was this patch tested?
1. New Unit tests are added for existing Rest endpoints. `attemptId` seems not coming in `local-mode` and coming in `YARN cluster mode` so could not be added for `attemptId` case (Suggestions are welcome).
2. Also, patch has been tested manually through both Spark Core and History Server Rest APIs.
Closes#29364 from erenavsarogullari/SPARK-32548.
Authored-by: Eren Avsarogullari <erenavsarogullari@gmail.com>
Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com>
### What changes were proposed in this pull request?
This PR is a followup on #29598 and removes the `ExpressionSet` class from the 2.13 branch.
### Why are the changes needed?
`ExpressionSet` does not extend Scala `Set` anymore and this class is no longer needed in the 2.13 branch.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Passes existing tests
Closes#29648 from dbaliafroozeh/RemoveExpressionSetFrom2.13Branch.
Authored-by: Ali Afroozeh <ali.afroozeh@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
`sql-expression-schema.md` automatically generated by `ExpressionsSchemaSuite`, but only expressions entries are checked in `ExpressionsSchemaSuite`. So if we manually modify the contents of the file, `ExpressionsSchemaSuite` does not necessarily guarantee the correctness of the it some times. For example, [Spark-24884](https://github.com/apache/spark/pull/27507) added `regexp_extract_all` expression support, and manually modify the `sql-expression-schema.md` but not change the content of `Number of queries` cause file content inconsistency.
Some additional checks have been added to `ExpressionsSchemaSuite` to improve the correctness guarantee of `sql-expression-schema.md` as follow:
- `Number of queries` should equals size of `expressions entries` in `sql-expression-schema.md`
- `Number of expressions that missing example` should equals size of `Expressions missing examples` in `sql-expression-schema.md`
- `MissExamples` from case should same as `expectedMissingExamples` from `sql-expression-schema.md`
### Why are the changes needed?
Ensure the correctness of `sql-expression-schema.md` content.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Enhanced ExpressionsSchemaSuite
Closes#29608 from LuciferYang/sql-expression-schema.
Authored-by: yangjie <yangjie@MacintoshdeMacBook-Pro.local>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
### What changes were proposed in this pull request?
Remove legacy silent support mode for spark-sql CLI.
### Why are the changes needed?
https://github.com/apache/spark/pull/29619 add new silent mode. We can remove legacy silent support mode.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manual test:
```
spark-sql> LM-SHC-16508156:spark yumwang$ bin/spark-sql -S
NOTE: SPARK_PREPEND_CLASSES is set, placing locally compiled Spark classes ahead of assembly.
20/09/03 09:06:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
20/09/03 09:06:16 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
20/09/03 09:06:16 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
20/09/03 09:06:19 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
20/09/03 09:06:19 WARN ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore yumwang10.226.196.190
spark-sql> select * from test1;
1
spark-sql> select * from test1;
1
```
Closes#29631 from wangyum/SPARK-32772.
Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
### What changes were proposed in this pull request?
The whole `DynamicPartitionPruningSuite` takes about 2 min on my laptop (either AE on or off). The slowest tests are `test("simple inner join triggers DPP with mock-up tables")` and `test("cleanup any DPP filter that isn't pushed down due to expression id clashes")`, which totally take about 1 min.
We can reuse existing test tables or use smaller tables to reduce the cost. After that, the two tests takes only about 1 sec in total, leading to 2x speedup for the suite.
### Why are the changes needed?
To speedup DPP test suites.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Modified two existing tests.
Closes#29636 from wzhfy/improve_dpp_test.
Authored-by: Zhenhua Wang <wzh_zju@163.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
### What changes were proposed in this pull request?
This PR fixes a bug `FileSourceStrategy`, which generates partition filters even if the table is not partitioned. This can confuse `FileSourceScanExec`, which mistakenly think the table is partitioned and tries to update the `numPartitions` metrics, and cause a failure. We should not generate partition filters for non-partitioned table.
### Why are the changes needed?
The bug was exposed by https://github.com/apache/spark/pull/29436.
### Does this PR introduce _any_ user-facing change?
Yes, fix a bug.
### How was this patch tested?
new test
Closes#29637 from cloud-fan/refactor.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
### What changes were proposed in this pull request?
This PR intends to fix a bug where references can be missing when adding aliases to widen data types in `WidenSetOperationTypes`. For example,
```
CREATE OR REPLACE TEMPORARY VIEW t3 AS VALUES (decimal(1)) tbl(v);
SELECT t.v FROM (
SELECT v FROM t3
UNION ALL
SELECT v + v AS v FROM t3
) t;
org.apache.spark.sql.AnalysisException: Resolved attribute(s) v#1 missing from v#3 in operator !Project [v#1]. Attribute(s) with the same name appear in the operation: v. Please check if the right attribute(s) are used.;;
!Project [v#1] <------ the reference got missing
+- SubqueryAlias t
+- Union
:- Project [cast(v#1 as decimal(11,0)) AS v#3]
: +- Project [v#1]
: +- SubqueryAlias t3
: +- SubqueryAlias tbl
: +- LocalRelation [v#1]
+- Project [v#2]
+- Project [CheckOverflow((promote_precision(cast(v#1 as decimal(11,0))) + promote_precision(cast(v#1 as decimal(11,0)))), DecimalType(11,0), true) AS v#2]
+- SubqueryAlias t3
+- SubqueryAlias tbl
+- LocalRelation [v#1]
```
In the case, `WidenSetOperationTypes` added the alias `cast(v#1 as decimal(11,0)) AS v#3`, then the reference in the top `Project` got missing. This PR correct the reference (`exprId` and widen `dataType`) after adding aliases in the rule.
### Why are the changes needed?
bugfixes
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added unit tests
Closes#29485 from maropu/SPARK-32638.
Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
LeftSemi and Existence SortMergeJoin should not buffer all matching right side rows when bound condition is empty, this is unnecessary and can lead to performance degradation especially when spilling happens.
### Why are the changes needed?
Performance improvement.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New UT and TPCDS benchmarks.
Closes#29572 from peter-toth/SPARK-32730-improve-leftsemi-sortmergejoin.
Authored-by: Peter Toth <peter.toth@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This PR changes `AttributeSet` and `ExpressionSet` to maintain the insertion order of the elements. More specifically, we:
- change the underlying data structure of `AttributeSet` from `HashSet` to `LinkedHashSet` to maintain the insertion order.
- `ExpressionSet` already uses a list to keep track of the expressions, however, since it is extending Scala's immutable.Set class, operations such as map and flatMap are delegated to the immutable.Set itself. This means that the result of these operations is not an instance of ExpressionSet anymore, rather it's a implementation picked up by the parent class. We also remove this inheritance from `immutable.Set `and implement the needed methods directly. ExpressionSet has a very specific semantics and it does not make sense to extend `immutable.Set` anyway.
- change the `PlanStabilitySuite` to not sort the attributes, to be able to catch changes in the order of expressions in different runs.
### Why are the changes needed?
Expressions identity is based on the `ExprId` which is an auto-incremented number. This means that the same query can yield a query plan with different expression ids in different runs. `AttributeSet` and `ExpressionSet` internally use a `HashSet` as the underlying data structure, and therefore cannot guarantee the a fixed order of operations in different runs. This can be problematic in cases we like to check for plan changes in different runs.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Passes `PlanStabilitySuite` after regenerating the golden files.
Closes#29598 from dbaliafroozeh/FixOrderOfExpressions.
Authored-by: Ali Afroozeh <ali.afroozeh@databricks.com>
Signed-off-by: herman <herman@databricks.com>
### What changes were proposed in this pull request?
Move StreamingRelationV2 to the catalyst module and bind with the Table interface.
### Why are the changes needed?
Currently, the StreamingRelationV2 is bind with TableProvider. Since the V2 relation is not bound with `DataSource`, to make it more flexible and have better expansibility, it should be moved to the catalyst module and bound with the Table interface. We did a similar thing for DataSourceV2Relation.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing UT.
Closes#29633 from xuanyuanking/SPARK-32782.
Authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This PR fails the interval values parsing when they contain non-ASCII characters which are silently omitted right now.
e.g. the case below should be invalid
```
select interval 'interval中文 1 day'
```
### Why are the changes needed?
bugfix, intervals should fail when containing invalid characters
### Does this PR introduce _any_ user-facing change?
yes,
#### before
select interval 'interval中文 1 day' results 1 day, now it fails with
```
org.apache.spark.sql.catalyst.parser.ParseException
Cannot parse the INTERVAL value: interval中文 1 day
```
### How was this patch tested?
new tests
Closes#29632 from yaooqinn/SPARK-32781.
Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This PR reduces log messages for spark-sql CLI like spark-shell and pyspark CLI.
### Why are the changes needed?
When we launch spark-sql CLI, too many log messages are shown and it's sometimes difficult to find the result of query.
```
spark-sql> SELECT now();
20/09/02 00:11:45 INFO CodeGenerator: Code generated in 10.121625 ms
20/09/02 00:11:45 INFO SparkContext: Starting job: main at NativeMethodAccessorImpl.java:0
20/09/02 00:11:45 INFO DAGScheduler: Got job 0 (main at NativeMethodAccessorImpl.java:0) with 1 output partitions
20/09/02 00:11:45 INFO DAGScheduler: Final stage: ResultStage 0 (main at NativeMethodAccessorImpl.java:0)
20/09/02 00:11:45 INFO DAGScheduler: Parents of final stage: List()
20/09/02 00:11:45 INFO DAGScheduler: Missing parents: List()
20/09/02 00:11:45 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[2] at main at NativeMethodAccessorImpl.java:0), which has no missing parents
20/09/02 00:11:45 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 6.3 KiB, free 366.3 MiB)
20/09/02 00:11:45 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 3.2 KiB, free 366.3 MiB)
20/09/02 00:11:45 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.1.204:42615 (size: 3.2 KiB, free: 366.3 MiB)
20/09/02 00:11:45 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1348
20/09/02 00:11:45 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[2] at main at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0))
20/09/02 00:11:45 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks resource profile 0
20/09/02 00:11:45 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0) (192.168.1.204, executor driver, partition 0, PROCESS_LOCAL, 7561 bytes) taskResourceAssignments Map()
20/09/02 00:11:45 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
20/09/02 00:11:45 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1446 bytes result sent to driver
20/09/02 00:11:45 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 238 ms on 192.168.1.204 (executor driver) (1/1)
20/09/02 00:11:45 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
20/09/02 00:11:45 INFO DAGScheduler: ResultStage 0 (main at NativeMethodAccessorImpl.java:0) finished in 0.343 s
20/09/02 00:11:45 INFO DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job
20/09/02 00:11:45 INFO TaskSchedulerImpl: Killing all running tasks in stage 0: Stage finished
20/09/02 00:11:45 INFO DAGScheduler: Job 0 finished: main at NativeMethodAccessorImpl.java:0, took 0.377489 s
2020-09-02 00:11:45.07
Time taken: 0.704 seconds, Fetched 1 row(s)
20/09/02 00:11:45 INFO SparkSQLCLIDriver: Time taken: 0.704 seconds, Fetched 1 row(s)
```
### Does this PR introduce _any_ user-facing change?
Yes. Log messages are reduced for spark-sql CLI like as follows.
```
20/09/02 00:34:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
20/09/02 00:34:53 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
20/09/02 00:34:53 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
20/09/02 00:34:55 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
20/09/02 00:34:55 WARN ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore kou192.168.1.204
Spark master: local[*], Application Id: local-1598974492822
spark-sql> SELECT now();
2020-09-02 00:35:05.258
Time taken: 2.299 seconds, Fetched 1 row(s)
```
### How was this patch tested?
Launched spark-sql CLI and confirmed that log messages are reduced as I paste above.
Closes#29619 from sarutak/suppress-log-for-spark-sql.
Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
Struct field both in GROUP BY and Aggregate Expresison with CUBE/ROLLUP/GROUPING SET will failed when analysis.
```
test("SPARK-31670") {
withTable("t1") {
sql(
"""
|CREATE TEMPORARY VIEW t(a, b, c) AS
|SELECT * FROM VALUES
|('A', 1, NAMED_STRUCT('row_id', 1, 'json_string', '{"i": 1}')),
|('A', 2, NAMED_STRUCT('row_id', 2, 'json_string', '{"i": 1}')),
|('A', 2, NAMED_STRUCT('row_id', 2, 'json_string', '{"i": 2}')),
|('B', 1, NAMED_STRUCT('row_id', 3, 'json_string', '{"i": 1}')),
|('C', 3, NAMED_STRUCT('row_id', 4, 'json_string', '{"i": 1}'))
""".stripMargin)
checkAnswer(
sql(
"""
|SELECT a, c.json_string, SUM(b)
|FROM t
|GROUP BY a, c.json_string
|WITH CUBE
|""".stripMargin),
Row("A", "{\"i\": 1}", 3) :: Row("A", "{\"i\": 2}", 2) :: Row("A", null, 5) ::
Row("B", "{\"i\": 1}", 1) :: Row("B", null, 1) ::
Row("C", "{\"i\": 1}", 3) :: Row("C", null, 3) ::
Row(null, "{\"i\": 1}", 7) :: Row(null, "{\"i\": 2}", 2) :: Row(null, null, 9) :: Nil)
}
}
```
Error
```
[info] - SPARK-31670 *** FAILED *** (2 seconds, 857 milliseconds)
[info] Failed to analyze query: org.apache.spark.sql.AnalysisException: expression 't.`c`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;;
[info] Aggregate [a#247, json_string#248, spark_grouping_id#246L], [a#247, c#223.json_string AS json_string#241, sum(cast(b#222 as bigint)) AS sum(b)#243L]
[info] +- Expand [List(a#221, b#222, c#223, a#244, json_string#245, 0), List(a#221, b#222, c#223, a#244, null, 1), List(a#221, b#222, c#223, null, json_string#245, 2), List(a#221, b#222, c#223, null, null, 3)], [a#221, b#222, c#223, a#247, json_string#248, spark_grouping_id#246L]
[info] +- Project [a#221, b#222, c#223, a#221 AS a#244, c#223.json_string AS json_string#245]
[info] +- SubqueryAlias t
[info] +- Project [col1#218 AS a#221, col2#219 AS b#222, col3#220 AS c#223]
[info] +- Project [col1#218, col2#219, col3#220]
[info] +- LocalRelation [col1#218, col2#219, col3#220]
[info]
```
For Struct type Field, when we resolve it, it will construct with Alias. When struct field in GROUP BY with CUBE/ROLLUP etc, struct field in groupByExpression and aggregateExpression will be resolved with different exprId as below
```
'Aggregate [cube(a#221, c#223.json_string AS json_string#240)], [a#221, c#223.json_string AS json_string#241, sum(cast(b#222 as bigint)) AS sum(b)#243L]
+- SubqueryAlias t
+- Project [col1#218 AS a#221, col2#219 AS b#222, col3#220 AS c#223]
+- Project [col1#218, col2#219, col3#220]
+- LocalRelation [col1#218, col2#219, col3#220]
```
This makes `ResolveGroupingAnalytics.constructAggregateExprs()` failed to replace aggreagteExpression use expand groupByExpression attribute since there exprId is not same. then error happened.
### Why are the changes needed?
Fix analyze bug
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
Added UT
Closes#28490 from AngersZhuuuu/SPARK-31670.
Lead-authored-by: angerszhu <angers.zhu@gmail.com>
Co-authored-by: AngersZhuuuu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>