### What changes were proposed in this pull request?
In Spark 3.0.0, the SparkGetColumnsOperation can not recognize NULL columns but now we can because the side effect of https://issues.apache.org/jira/browse/SPARK-32696 / f14f3742e0, but the test coverage for this change was not added.
In Spark, the column size for null fields should be 1, in this PR, we set the right column size for the null type.
### Why are the changes needed?
test coverage and fix the client-side information about the null type through jdbc
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
added ut both for this pr and SPARK-32696
Closes#29687 from yaooqinn/SPARK-32826.
Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This patch proposes to check `ignoreNullability` parameter recursively in `equalsStructurally` method.
### Why are the changes needed?
`equalsStructurally` is used to check type equality. We can optionally ask to ignore nullability check. But the parameter `ignoreNullability` is not passed recursively down to nested types. So it produces weird error like:
```
data type mismatch: argument 3 requires array<array<string>> type, however ... is of array<array<string>> type.
```
when running the query `select aggregate(split('abcdefgh',''), array(array('')), (acc, x) -> array(array( x ) ) )`.
### Does this PR introduce _any_ user-facing change?
Yes, fixed a bug when running user query.
### How was this patch tested?
Unit tests.
Closes#29698 from viirya/SPARK-32819.
Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This PR aims to fix the test coverage at `DataStreamReaderWriterSuite`.
### Why are the changes needed?
Currently, the test case checks `DataStreamReader` options instead of `DataStreamWriter` options.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass the revised test case.
Closes#29701 from dongjoon-hyun/SPARK-32836.
Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
This PR is a follow up to https://github.com/apache/spark/pull/29543#discussion_r485409606, which correctly points out that the check for the empty string is not necessary.
### Why are the changes needed?
The unnecessary check actually could cause more confusion.
For example,
```scala
scala> Seq(1).toDF.write.option("path", "/tmp/path1").parquet("")
java.lang.IllegalArgumentException: Can not create a Path from an empty string
at org.apache.hadoop.fs.Path.checkPathArg(Path.java:168)
```
even when `path` option is available. This PR addresses to fix this confusion.
### Does this PR introduce _any_ user-facing change?
Yes, now the above example prints the consistent exception message whether the path parameter value is empty or not.
```scala
scala> Seq(1).toDF.write.option("path", "/tmp/path1").parquet("")
org.apache.spark.sql.AnalysisException: There is a 'path' option set and save() is called with a path parameter. Either remove the path option, or call save() without the parameter. To ignore this check, set 'spark.sql.legacy.pathOptionBehavior.enabled' to 'true'.;
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:290)
at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:856)
... 47 elided
```
### How was this patch tested?
Added unit tests.
Closes#29697 from imback82/SPARK-32516-followup.
Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Make MicroBatchExecution explicitly call `getBatch` when the start and end offsets are the same.
### Why are the changes needed?
Structured Streaming micro-batch engine has the contract with V1 data sources that, after a restart, it will call `source.getBatch()` on the last batch attempted before the restart. However, a very rare combination of sequences violates this contract. It occurs only when
- The streaming query has specific types of stateful operations with watermarks (e.g., aggregation in append, mapGroupsWithState with timeouts).
- These queries can execute a batch even without new data when the previous updates the watermark and the stateful ops are such that the new watermark can cause new output/cleanup. Such batches are called no-data-batches.
- The last batch before termination was an incomplete no-data-batch. Upon restart, the micro-batch engine fails to call `source.getBatch` when attempting to re-execute the incomplete no-data-batch.
This occurs because no-data-batches has the same and end offsets, and when a batch is executed, if the start and end offset is same then calling `source.getBatch` is skipped as it is assumed the generated plan will be empty. This only affects V1 data sources like Delta and Autoloader which rely on this invariant to detect in the source whether the query is being started from scratch or restarted.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
New unit test with a mock v1 source that fails without the fix.
Closes#29651 from tdas/SPARK-32794.
Authored-by: Tathagata Das <tathagata.das1565@gmail.com>
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
### What changes were proposed in this pull request?
`--` method of `AttributeSet` behave differently under Scala 2.12 and 2.13 because `--` method of `LinkedHashSet` in Scala 2.13 can't maintains the insertion order.
This pr use a Scala 2.12 based code to ensure `--` method of AttributeSet have same behavior under Scala 2.12 and 2.13.
### Why are the changes needed?
The behavior of `AttributeSet` needs to be compatible with Scala 2.12 and 2.13
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Scala 2.12: Pass the Jenkins or GitHub Action
Scala 2.13: Manual test sub-suites of `PlanStabilitySuite`
- **Before** :293 TESTS FAILED
- **After**:13 TESTS FAILED(The remaining failures are not associated with the current issue)
Closes#29689 from LuciferYang/SPARK-32755-FOLLOWUP.
Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
The purpose of this pr is to partial resolve [SPARK-32808](https://issues.apache.org/jira/browse/SPARK-32808), total of 26 failed test cases were fixed, the related suite as follow:
- `StreamingAggregationSuite` related test cases (2 FAILED -> Pass)
- `GeneratorFunctionSuite` related test cases (2 FAILED -> Pass)
- `UDFSuite` related test cases (2 FAILED -> Pass)
- `SQLQueryTestSuite` related test cases (5 FAILED -> Pass)
- `WholeStageCodegenSuite` related test cases (1 FAILED -> Pass)
- `DataFrameSuite` related test cases (3 FAILED -> Pass)
- `OrcV1QuerySuite\OrcV2QuerySuite` related test cases (4 FAILED -> Pass)
- `ExpressionsSchemaSuite` related test cases (1 FAILED -> Pass)
- `DataFrameStatSuite` related test cases (1 FAILED -> Pass)
- `JsonV1Suite\JsonV2Suite\JsonLegacyTimeParserSuite` related test cases (6 FAILED -> Pass)
The main change of this pr as following:
- Fix Scala 2.13 compilation problems in `ShuffleBlockFetcherIterator` and `Analyzer`
- Specified `Seq` to `scala.collection.Seq` in `objects.scala` and `GenericArrayData` because internal use `Seq` maybe `mutable.ArraySeq` and not easy to call `.toSeq`
- Should specified `Seq` to `scala.collection.Seq` when we call `Row.getAs[Seq]` and `Row.get(i).asInstanceOf[Seq]` because the data maybe `mutable.ArraySeq` but `Seq` is `immutable.Seq` in Scala 2.13
- Use a compatible way to let `+` and `-` method of `Decimal` having the same behavior in Scala 2.12 and Scala 2.13
- Call `toList` in `RelationalGroupedDataset.toDF` method when `groupingExprs` is `Stream` type because `Stream` can't serialize in Scala 2.13
- Add a manual sort to `classFunsMap` in `ExpressionsSchemaSuite` because `Iterable.groupBy` in Scala 2.13 has different result with `TraversableLike.groupBy` in Scala 2.12
### Why are the changes needed?
We need to support a Scala 2.13 build.
### Does this PR introduce _any_ user-facing change?
Should specified `Seq` to `scala.collection.Seq` when we call `Row.getAs[Seq]` and `Row.get(i).asInstanceOf[Seq]` because the data maybe `mutable.ArraySeq` but the `Seq` is `immutable.Seq` in Scala 2.13
### How was this patch tested?
- Scala 2.12: Pass the Jenkins or GitHub Action
- Scala 2.13: Do the following:
```
dev/change-scala-version.sh 2.13
mvn clean install -DskipTests -pl sql/core -Pscala-2.13 -am
mvn test -pl sql/core -Pscala-2.13
```
**Before**
```
Tests: succeeded 8166, failed 319, canceled 1, ignored 52, pending 0
*** 319 TESTS FAILED ***
```
**After**
```
Tests: succeeded 8204, failed 286, canceled 1, ignored 52, pending 0
*** 286 TESTS FAILED ***
```
Closes#29660 from LuciferYang/SPARK-32808.
Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
### What changes were proposed in this pull request?
It should check `IPython` as it's imported as a package. Currently, Sphinx build is being skipped in GitHub Actions as below:
https://github.com/apache/spark/runs/1084164546
```
starting python compilation test...
python compilation succeeded.
starting pycodestyle test...
pycodestyle checks passed.
starting flake8 test...
flake8 checks passed.
python3 does not have ipython installed. Skipping Sphinx build for now.
all lint-python tests passed!
```
### Why are the changes needed?
To run the documentation builds in Github Actions.
### Does this PR introduce _any_ user-facing change?
No, dev-only
### How was this patch tested?
Manually tested as `dev/lint-python`.
Closes#29679 from HyukjinKwon/follow-ipython.
Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com>
### What changes were proposed in this pull request?
If no active SparkSession is available, let `FileSourceScanExec.needsUnsafeRowConversion` look at default SQL config of ParquetSource vectorized reader instead of failing the query execution.
### Why are the changes needed?
Fix a bug that if no active SparkSession is available, file-based data source scan for Parquet Source will throw exception.
### Does this PR introduce _any_ user-facing change?
Yes, this change fixes the bug.
### How was this patch tested?
Unit test.
Closes#29667 from viirya/SPARK-32813.
Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Fixes the master UI for properly summing the resources total across multiple workers.
field:
Resources in use: 0 / 8 gpu
The bug here is that it was creating MutableResourceInfo and then reducing using the + operator. the + operator in MutableResourceInfo simple adds the address from one to the addresses of the other. But its using a HashSet so if the addresses are the same then you lose the correct amount. ie worker1 has gpu addresses 0,1,2,3 and worker2 has addresses 0,1,2,3 then you only see 4 total GPUs when there are 8.
In this case we don't really need to create the MutableResourceInfo at all because we just want the sums for used and total so just remove the use of it. The other uses of it are per Worker so those should be ok.
### Why are the changes needed?
fix UI
### Does this PR introduce _any_ user-facing change?
UI
### How was this patch tested?
tested manually on standalone cluster with multiple workers and multiple GPUs and multiple fpgas
Closes#29683 from tgravescs/SPARK-32823.
Lead-authored-by: Thomas Graves <tgraves@nvidia.com>
Co-authored-by: Thomas Graves <tgraves@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
In the PR, I propose to move the test `SPARK-32810: CSV and JSON data sources should be able to read files with escaped glob metacharacter in the paths` from `DataFrameReaderWriterSuite` to `CSVSuite` and to `JsonSuite`. This will allow to run the same test in `CSVv1Suite`/`CSVv2Suite` and in `JsonV1Suite`/`JsonV2Suite`.
### Why are the changes needed?
To improve test coverage by checking JSON/CSV datasources v1 and v2.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
By running affected test suites:
```
$ build/sbt "sql/test:testOnly org.apache.spark.sql.execution.datasources.csv.*"
$ build/sbt "sql/test:testOnly org.apache.spark.sql.execution.datasources.json.*"
```
Closes#29684 from MaxGekk/globbing-paths-when-inferring-schema-dsv2.
Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
If the user forgets to specify .amount on a resource config like spark.executor.resource.gpu, the error message thrown is very confusing:
```
ERROR SparkContext: Error initializing SparkContext.java.lang.StringIndexOutOfBoundsException: String index out of range:
-1 at java.lang.String.substring(String.java:1967) at
```
This makes it so we have a readable error thrown
### Why are the changes needed?
confusing error for users
### Does this PR introduce _any_ user-facing change?
just error message
### How was this patch tested?
Tested manually on standalone cluster
Closes#29685 from tgravescs/SPARK-32824.
Authored-by: Thomas Graves <tgraves@nvidia.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Fix indentation and clean up view in the test added by https://github.com/apache/spark/pull/29593.
### Why are the changes needed?
Address review comments in https://github.com/apache/spark/pull/29665.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Updated test.
Closes#29682 from manuzhang/spark-32753-followup.
Authored-by: manuzhang <owenzhang1990@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
In the PR, I propose to fix an issue with LibSVM datasource when both of the following are true:
* no user specified schema
* some file paths contain escaped glob metacharacters, such as `[``]`, `{``}`, `*` etc.
The fix is based on another bug fix for CSV/JSON datasources https://github.com/apache/spark/pull/29659.
### Why are the changes needed?
To fix the issue when the follow two queries try to read from paths `[abc]`:
```scala
spark.read.format("libsvm").load("""/tmp/\[abc\].csv""").show
```
but would end up hitting an exception:
```
Path does not exist: file:/private/var/folders/p3/dfs6mf655d7fnjrsjvldh0tc0000gn/T/spark-6ef0ae5e-ff9f-4c4f-9ff4-0db3ee1f6a82/[abc]/part-00000-26406ab9-4e56-45fd-a25a-491c18a05e76-c000.libsvm;
org.apache.spark.sql.AnalysisException: Path does not exist: file:/private/var/folders/p3/dfs6mf655d7fnjrsjvldh0tc0000gn/T/spark-6ef0ae5e-ff9f-4c4f-9ff4-0db3ee1f6a82/[abc]/part-00000-26406ab9-4e56-45fd-a25a-491c18a05e76-c000.libsvm;
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$3(DataSource.scala:770)
at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:373)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
at scala.util.Success.$anonfun$map$1(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
```
### Does this PR introduce _any_ user-facing change?
Yes
### How was this patch tested?
Added UT to `LibSVMRelationSuite`.
Closes#29670 from MaxGekk/globbing-paths-when-inferring-schema-ml.
Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
In `SubqueryBroadcastExec.relationFuture`, if the `broadcastRelation` is an `EmptyHashedRelation`, then `broadcastRelation.keys()` will throw `UnsupportedOperationException`.
### Why are the changes needed?
To fix a bug.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added a new test.
Closes#29671 from wzhfy/dpp_empty_broadcast.
Authored-by: Zhenhua Wang <wzh_zju@163.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
### What changes were proposed in this pull request?
Before SPARK-31511 is fixed, `BytesToBytesMap` iterator() is not thread-safe and may cause data inaccuracy.
We need to add a unit test.
### Why are the changes needed?
Increase test coverage to ensure that iterator() is thread-safe.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
add ut
Closes#29669 from cxzl25/SPARK-31511-test.
Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This reverts commit 04f7f6dac0 due to the discussion in [comment](https://github.com/apache/spark/pull/29589#discussion_r484657207).
### Why are the changes needed?
Based on the discussion in [comment](https://github.com/apache/spark/pull/29589#discussion_r484657207), propagation for thread local properties in `SubqueryBroadcastExec` is not necessary, since they will be propagated by broadcast exchange threads anyway.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Also revert the added test.
Closes#29674 from wzhfy/revert_dpp_thread_local.
Authored-by: Zhenhua Wang <wzh_zju@163.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
### What changes were proposed in this pull request?
The motivation of this PR is to avoid caching the removed decommissioned executors in `TaskSchedulerImpl`. The cache is introduced in https://github.com/apache/spark/pull/29422. The cache will hold the `isHostDecommissioned` info for a while. So if the task `FetchFailure` event comes after the executor loss event, `DAGScheduler` can still get the `isHostDecommissioned` from the cache and unregister the host shuffle map status when the host is decommissioned too.
This PR tries to achieve the same goal without the cache. Instead of saving the `workerLost` in `ExecutorUpdated` / `ExecutorDecommissionInfo` / `ExecutorDecommissionState`, we could save the `hostOpt` directly. When the host is decommissioned or lost too, the `hostOpt` can be a specific host address. Otherwise, it's `None` to indicate that only the executor is decommissioned or lost.
Now that we have the host info, we can also unregister the host shuffle map status when `executorLost` is triggered for the decommissioned executor.
Besides, this PR also includes a few cleanups around the touched code.
### Why are the changes needed?
It helps to unregister the shuffle map status earlier for both decommission and normal executor lost cases.
It also saves memory in `TaskSchedulerImpl` and simplifies the code a little bit.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
This PR only refactor the code. The original behaviour should be covered by `DecommissionWorkerSuite`.
Closes#29579 from Ngone51/impr-decom.
Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This is a Spark 3.0 regression introduced by https://github.com/apache/spark/pull/26761. We missed a corner case that `java.lang.Double.compare` treats 0.0 and -0.0 as different, which breaks SQL semantic.
This PR adds back the `OrderingUtil`, to provide custom compare methods that take care of 0.0 vs -0.0
### Why are the changes needed?
Fix a correctness bug.
### Does this PR introduce _any_ user-facing change?
Yes, now `SELECT 0.0 > -0.0` returns false correctly as Spark 2.x.
### How was this patch tested?
new tests
Closes#29647 from cloud-fan/float.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
In certain environments, seems it fails to run `run-tests.py` script as below:
```
Traceback (most recent call last):
File "<string>", line 1, in <module>
...
raise RuntimeError('''
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
Traceback (most recent call last):
...
raise EOFError
EOFError
```
The reason is that `Manager.dict()` launches another process when the main process is initiated.
It works in most environments for an unknown reason but it should be good to avoid such pattern as guided from Python itself.
### Why are the changes needed?
To prevent the test failure for Python.
### Does this PR introduce _any_ user-facing change?
No, it fixes a test script.
### How was this patch tested?
Manually ran the script after fixing.
```
Running PySpark tests. Output is in /.../python/unit-tests.log
Will test against the following Python executables: ['/.../python3', 'python3.8']
Will test the following Python tests: ['pyspark.sql.dataframe']
/.../python3 python_implementation is CPython
/.../python3 version is: Python 3.8.5
python3.8 python_implementation is CPython
python3.8 version is: Python 3.8.5
Starting test(/.../python3): pyspark.sql.dataframe
Starting test(python3.8): pyspark.sql.dataframe
Finished test(/.../python3): pyspark.sql.dataframe (33s)
Finished test(python3.8): pyspark.sql.dataframe (34s)
Tests passed in 34 seconds
```
Closes#29666 from itholic/SPARK-32812.
Authored-by: itholic <haejoon309@naver.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This PR proposes to document the way of debugging PySpark. It's pretty much self-descriptive.
I made a demo site to review it more effectively: https://hyukjin-spark.readthedocs.io/en/stable/development/debugging.html
### Why are the changes needed?
To let users know how to debug PySpark applications.
### Does this PR introduce _any_ user-facing change?
Yes, it adds a new page in the documentation about debugging PySpark.
### How was this patch tested?
Manually built the doc.
Closes#29639 from HyukjinKwon/SPARK-32186.
Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This is a followup of https://github.com/apache/spark/pull/29485
It moves the plan rewriting methods from `Analyzer` to `QueryPlan`, so that it can work with `SparkPlan` as well. This PR also does an improvement to support a corner case (The attribute to be replace stays together with an unresolved attribute), and make it more general, so that `WidenSetOperationTypes` can rewrite the plan in one shot like before.
### Why are the changes needed?
Code cleanup and generalize.
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
existing test
Closes#29643 from cloud-fan/cleanup.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
### What changes were proposed in this pull request?
In the PR, I propose to fix an issue with the CSV and JSON data sources in Spark SQL when both of the following are true:
* no user specified schema
* some file paths contain escaped glob metacharacters, such as `[``]`, `{``}`, `*` etc.
### Why are the changes needed?
To fix the issue when the follow two queries try to read from paths `[abc].csv` and `[abc].json`:
```scala
spark.read.csv("""/tmp/\[abc\].csv""").show
spark.read.json("""/tmp/\[abc\].json""").show
```
but would end up hitting an exception:
```
org.apache.spark.sql.AnalysisException: Path does not exist: file:/tmp/[abc].csv;
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$1(DataSource.scala:722)
at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:392)
```
### Does this PR introduce _any_ user-facing change?
Yes
### How was this patch tested?
Added new test cases in `DataFrameReaderWriterSuite`.
Closes#29659 from MaxGekk/globbing-paths-when-inferring-schema.
Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This PR proposes to add new argument `allowMissingColumns` to `unionByName` for allowing users to specify whether to allow missing columns or not.
### Why are the changes needed?
To expose `allowMissingColumns` argument in Python API also. Currently this is only exposed in Scala/Java APIs.
### Does this PR introduce _any_ user-facing change?
Yes, it adds a new examples with new argument in the docstring.
### How was this patch tested?
Doctest added and manually tested
```
$ python/run-tests --testnames pyspark.sql.dataframe
Running PySpark tests. Output is in /.../spark/python/unit-tests.log
Will test against the following Python executables: ['/.../python3', 'python3.8']
Will test the following Python tests: ['pyspark.sql.dataframe']
/.../python3 python_implementation is CPython
/.../python3 version is: Python 3.8.5
python3.8 python_implementation is CPython
python3.8 version is: Python 3.8.5
Starting test(/.../python3): pyspark.sql.dataframe
Starting test(python3.8): pyspark.sql.dataframe
Finished test(python3.8): pyspark.sql.dataframe (35s)
Finished test(/.../python3): pyspark.sql.dataframe (35s)
Tests passed in 35 seconds
```
Closes#29657 from itholic/SPARK-32798.
Authored-by: itholic <haejoon309@naver.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Only copy tags to node with no tags when transforming plans.
### Why are the changes needed?
cloud-fan [made a good point](https://github.com/apache/spark/pull/29593#discussion_r482013121) that it doesn't make sense to append tags to existing nodes when nodes are removed. That will cause such bugs as duplicate rows when deduplicating and repartitioning by the same column with AQE.
```
spark.range(10).union(spark.range(10)).createOrReplaceTempView("v1")
val df = spark.sql("select id from v1 group by id distribute by id")
println(df.collect().toArray.mkString(","))
println(df.queryExecution.executedPlan)
// With AQE
[4],[0],[3],[2],[1],[7],[6],[8],[5],[9],[4],[0],[3],[2],[1],[7],[6],[8],[5],[9]
AdaptiveSparkPlan(isFinalPlan=true)
+- CustomShuffleReader local
+- ShuffleQueryStage 0
+- Exchange hashpartitioning(id#183L, 10), true
+- *(3) HashAggregate(keys=[id#183L], functions=[], output=[id#183L])
+- Union
:- *(1) Range (0, 10, step=1, splits=2)
+- *(2) Range (0, 10, step=1, splits=2)
// Without AQE
[4],[7],[0],[6],[8],[3],[2],[5],[1],[9]
*(4) HashAggregate(keys=[id#206L], functions=[], output=[id#206L])
+- Exchange hashpartitioning(id#206L, 10), true
+- *(3) HashAggregate(keys=[id#206L], functions=[], output=[id#206L])
+- Union
:- *(1) Range (0, 10, step=1, splits=2)
+- *(2) Range (0, 10, step=1, splits=2)
```
It's too expensive to detect node removal so we make a compromise only to copy tags to node with no tags.
### Does this PR introduce _any_ user-facing change?
Yes. Fix a bug.
### How was this patch tested?
Add test.
Closes#29593 from manuzhang/spark-32753.
Authored-by: manuzhang <owenzhang1990@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Since [SPARK-22590](2854091d12), local property propagation is supported through `SQLExecution.withThreadLocalCaptured` in both `BroadcastExchangeExec` and `SubqueryExec` when computing `relationFuture`. This pr adds the support in `SubqueryBroadcastExec`.
### Why are the changes needed?
Local property propagation is missed in `SubqueryBroadcastExec`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Add a new test.
Closes#29589 from wzhfy/thread_local.
Authored-by: Zhenhua Wang <wzh_zju@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
No need of using database name in `loadPartition` API of `Shim_v3_0` to get the hive table, in hive there is a overloaded method which gives hive table using table name. By using this API dependency with `SessionCatalog` can be removed in Shim layer
### Why are the changes needed?
To avoid deadlock when communicating with Hive metastore 3.1.x
```
Found one Java-level deadlock:
=============================
"worker3":
waiting to lock monitor 0x00007faf0be602b8 (object 0x00000007858f85f0, a org.apache.spark.sql.hive.HiveSessionCatalog),
which is held by "worker0"
"worker0":
waiting to lock monitor 0x00007faf0be5fc88 (object 0x0000000785c15c80, a org.apache.spark.sql.hive.HiveExternalCatalog),
which is held by "worker3"
Java stack information for the threads listed above:
===================================================
"worker3":
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.getCurrentDatabase(SessionCatalog.scala:256)
- waiting to lock <0x00000007858f85f0> (a org.apache.spark.sql.hive.HiveSessionCatalog)
at org.apache.spark.sql.hive.client.Shim_v3_0.loadPartition(HiveShim.scala:1332)
at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$loadPartition$1(HiveClientImpl.scala:870)
at org.apache.spark.sql.hive.client.HiveClientImpl$$Lambda$4459/1387095575.apply$mcV$sp(Unknown Source)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:294)
at org.apache.spark.sql.hive.client.HiveClientImpl$$Lambda$2227/313239499.apply(Unknown Source)
at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:227)
at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:226)
- locked <0x0000000785ef9d78> (a org.apache.spark.sql.hive.client.IsolatedClientLoader)
at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:276)
at org.apache.spark.sql.hive.client.HiveClientImpl.loadPartition(HiveClientImpl.scala:860)
at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$loadPartition$1(HiveExternalCatalog.scala:911)
at org.apache.spark.sql.hive.HiveExternalCatalog$$Lambda$4457/2037578495.apply$mcV$sp(Unknown Source)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:99)
- locked <0x0000000785c15c80> (a org.apache.spark.sql.hive.HiveExternalCatalog)
at org.apache.spark.sql.hive.HiveExternalCatalog.loadPartition(HiveExternalCatalog.scala:890)
at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.loadPartition(ExternalCatalogWithListener.scala:179)
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.loadPartition(SessionCatalog.scala:512)
at org.apache.spark.sql.execution.command.LoadDataCommand.run(tables.scala:383)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
- locked <0x00000007b1690ff8> (a org.apache.spark.sql.execution.command.ExecutedCommandExec)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
at org.apache.spark.sql.Dataset$$Lambda$2084/428667685.apply(Unknown Source)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616)
at org.apache.spark.sql.Dataset$$Lambda$2085/559530590.apply(Unknown Source)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$$$Lambda$2093/139449177.apply(Unknown Source)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at org.apache.spark.sql.execution.SQLExecution$$$Lambda$2086/1088974677.apply(Unknown Source)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
at org.apache.spark.sql.Dataset$$$Lambda$1959/1977822284.apply(Unknown Source)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:606)
at org.apache.spark.sql.SparkSession$$Lambda$1899/424830920.apply(Unknown Source)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:601)
at $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anon$1.run(<console>:45)
at java.lang.Thread.run(Thread.java:748)
"worker0":
at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:99)
- waiting to lock <0x0000000785c15c80
> (a org.apache.spark.sql.hive.HiveExternalCatalog)
at org.apache.spark.sql.hive.HiveExternalCatalog.tableExists(HiveExternalCatalog.scala:851)
at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.tableExists(ExternalCatalogWithListener.scala:146)
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.tableExists(SessionCatalog.scala:432)
- locked <0x00000007858f85f0> (a org.apache.spark.sql.hive.HiveSessionCatalog)
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.requireTableExists(SessionCatalog.scala:185)
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.loadPartition(SessionCatalog.scala:509)
at org.apache.spark.sql.execution.command.LoadDataCommand.run(tables.scala:383)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
- locked <0x00000007b529af58> (a org.apache.spark.sql.execution.command.ExecutedCommandExec)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
at org.apache.spark.sql.Dataset$$Lambda$2084/428667685.apply(Unknown Source)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616)
at org.apache.spark.sql.Dataset$$Lambda$2085/559530590.apply(Unknown Source)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$$$Lambda$2093/139449177.apply(Unknown Source)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at org.apache.spark.sql.execution.SQLExecution$$$Lambda$2086/1088974677.apply(Unknown Source)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
at org.apache.spark.sql.Dataset$$$Lambda$1959/1977822284.apply(Unknown Source)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:606)
at org.apache.spark.sql.SparkSession$$Lambda$1899/424830920.apply(Unknown Source)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:601)
at $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anon$1.run(<console>:45)
at java.lang.Thread.run(Thread.java:748)
Found 1 deadlock.
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Tested using below script by executing in spark-shell and I found no dead lock
launch spark-shell using ./bin/spark-shell --conf "spark.sql.hive.metastore.jars=maven" --conf spark.sql.hive.metastore.version=3.1 --conf spark.hadoop.datanucleus.schema.autoCreateAll=true
**code**
```
def testHiveDeadLock = {
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
println("test hive DeadLock")
spark.sql("drop database if exists testDeadLock cascade")
spark.sql("create database testDeadLock")
spark.sql("use testDeadLock")
val tableCount = 100
val tableNamePrefix = "testdeadlock"
for (i <- 0 until tableCount) {
val tableName = s"$tableNamePrefix${i + 1}"
spark.sql(s"drop table if exists $tableName")
spark.sql(s"create table $tableName (a bigint) partitioned by (b bigint) stored as orc")
}
val threads = new ArrayBuffer[Thread]
for (i <- 0 until tableCount) {
threads.append(new Thread( new Runnable {
override def run: Unit = {
val tableName = s"$tableNamePrefix${i + 1}"
val rand = Random
val df = spark.range(0, 20000).toDF("a")
val location = s"/tmp/${rand.nextLong.abs}"
df.write.mode("overwrite").orc(location)
spark.sql(
s"""
LOAD DATA LOCAL INPATH '$location' INTO TABLE $tableName partition (b=$i)""")
}
}, s"worker$i"))
threads(i).start()
}
for (i <- 0 until tableCount) {
println(s"Joining with thread $i")
threads(i).join()
}
for (i <- 0 until tableCount) {
val tableName = s"$tableNamePrefix${i + 1}"
spark.sql(s"select count(*) from $tableName").show(false)
}
println("All done")
}
for(i <- 0 until 100) {
testHiveDeadLock
println(s"completed {$i}th iteration")
}
}
```
Closes#29649 from sandeep-katta/metastore3.1DeadLock.
Authored-by: sandeep.katta <sandeep.katta2007@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Change `CreateFunctionCommand` code that add class check before create function.
### Why are the changes needed?
We have different behavior between create permanent function and temporary function when function class is invaild. e.g.,
```
create function f as 'test.non.exists.udf';
-- Time taken: 0.104 seconds
create temporary function f as 'test.non.exists.udf'
-- Error in query: Can not load class 'test.non.exists.udf' when registering the function 'f', please make sure it is on the classpath;
```
And Hive also fails both of them.
### Does this PR introduce _any_ user-facing change?
Yes, user will get exception when create a invalid udf.
### How was this patch tested?
New test.
Closes#29502 from ulysses-you/function.
Authored-by: ulysses <youxiduo@weidian.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
bugfix for incomplete interval values, e.g. interval '1', interval '1 day 2', currently these cases will result null, but actually we should fail them with IllegalArgumentsException
### Why are the changes needed?
correctness
### Does this PR introduce _any_ user-facing change?
yes, incomplete intervals will throw exception now
#### before
```
bin/spark-sql -S -e "select interval '1', interval '+', interval '1 day -'"
NULL NULL NULL
```
#### after
```
-- !query
select interval '1'
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.catalyst.parser.ParseException
Cannot parse the INTERVAL value: 1(line 1, pos 7)
== SQL ==
select interval '1'
```
### How was this patch tested?
unit tests added
Closes#29635 from yaooqinn/SPARK-32785.
Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Currently, Spark Public Rest APIs support Application attemptId except SQL API. This causes `no such app: application_X` issue when the application has `attemptId` (e.g: YARN cluster mode).
Please find existing and supported Rest endpoints with attemptId.
```
// Existing Rest Endpoints
applications/{appId}/sql
applications/{appId}/sql/{executionId}
// Rest Endpoints required support
applications/{appId}/{attemptId}/sql
applications/{appId}/{attemptId}/sql/{executionId}
```
Also fixing following compile warning on `SqlResourceSuite`:
```
[WARNING] [Warn] ~/spark/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala:67: Reference to uninitialized value edges
```
### Why are the changes needed?
This causes `no such app: application_X` issue when the application has `attemptId`.
### Does this PR introduce _any_ user-facing change?
Not yet because SQL Rest API is being planned to release with `Spark 3.1`.
### How was this patch tested?
1. New Unit tests are added for existing Rest endpoints. `attemptId` seems not coming in `local-mode` and coming in `YARN cluster mode` so could not be added for `attemptId` case (Suggestions are welcome).
2. Also, patch has been tested manually through both Spark Core and History Server Rest APIs.
Closes#29364 from erenavsarogullari/SPARK-32548.
Authored-by: Eren Avsarogullari <erenavsarogullari@gmail.com>
Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com>
### What changes were proposed in this pull request?
This PR is a followup on #29598 and removes the `ExpressionSet` class from the 2.13 branch.
### Why are the changes needed?
`ExpressionSet` does not extend Scala `Set` anymore and this class is no longer needed in the 2.13 branch.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Passes existing tests
Closes#29648 from dbaliafroozeh/RemoveExpressionSetFrom2.13Branch.
Authored-by: Ali Afroozeh <ali.afroozeh@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This PR proposes to add a page to describe how to test PySpark. Note that it avoids duplication of https://spark.apache.org/developer-tools.html and it more aims to add put the relevant links together.
I made a demo site to review more effectively: https://hyukjin-spark.readthedocs.io/en/stable/development/testing.html
### Why are the changes needed?
To guide PySpark developers easily test.
### Does this PR introduce _any_ user-facing change?
Yes, it will adds a new documentation page.
### How was this patch tested?
Manually tested.
Closes#29634 from HyukjinKwon/SPARK-32783.
Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
`sql-expression-schema.md` automatically generated by `ExpressionsSchemaSuite`, but only expressions entries are checked in `ExpressionsSchemaSuite`. So if we manually modify the contents of the file, `ExpressionsSchemaSuite` does not necessarily guarantee the correctness of the it some times. For example, [Spark-24884](https://github.com/apache/spark/pull/27507) added `regexp_extract_all` expression support, and manually modify the `sql-expression-schema.md` but not change the content of `Number of queries` cause file content inconsistency.
Some additional checks have been added to `ExpressionsSchemaSuite` to improve the correctness guarantee of `sql-expression-schema.md` as follow:
- `Number of queries` should equals size of `expressions entries` in `sql-expression-schema.md`
- `Number of expressions that missing example` should equals size of `Expressions missing examples` in `sql-expression-schema.md`
- `MissExamples` from case should same as `expectedMissingExamples` from `sql-expression-schema.md`
### Why are the changes needed?
Ensure the correctness of `sql-expression-schema.md` content.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Enhanced ExpressionsSchemaSuite
Closes#29608 from LuciferYang/sql-expression-schema.
Authored-by: yangjie <yangjie@MacintoshdeMacBook-Pro.local>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
### What changes were proposed in this pull request?
Remove legacy silent support mode for spark-sql CLI.
### Why are the changes needed?
https://github.com/apache/spark/pull/29619 add new silent mode. We can remove legacy silent support mode.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manual test:
```
spark-sql> LM-SHC-16508156:spark yumwang$ bin/spark-sql -S
NOTE: SPARK_PREPEND_CLASSES is set, placing locally compiled Spark classes ahead of assembly.
20/09/03 09:06:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
20/09/03 09:06:16 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
20/09/03 09:06:16 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
20/09/03 09:06:19 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
20/09/03 09:06:19 WARN ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore yumwang10.226.196.190
spark-sql> select * from test1;
1
spark-sql> select * from test1;
1
```
Closes#29631 from wangyum/SPARK-32772.
Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
### What changes were proposed in this pull request?
The whole `DynamicPartitionPruningSuite` takes about 2 min on my laptop (either AE on or off). The slowest tests are `test("simple inner join triggers DPP with mock-up tables")` and `test("cleanup any DPP filter that isn't pushed down due to expression id clashes")`, which totally take about 1 min.
We can reuse existing test tables or use smaller tables to reduce the cost. After that, the two tests takes only about 1 sec in total, leading to 2x speedup for the suite.
### Why are the changes needed?
To speedup DPP test suites.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Modified two existing tests.
Closes#29636 from wzhfy/improve_dpp_test.
Authored-by: Zhenhua Wang <wzh_zju@163.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
### What changes were proposed in this pull request?
This PR fixes a bug `FileSourceStrategy`, which generates partition filters even if the table is not partitioned. This can confuse `FileSourceScanExec`, which mistakenly think the table is partitioned and tries to update the `numPartitions` metrics, and cause a failure. We should not generate partition filters for non-partitioned table.
### Why are the changes needed?
The bug was exposed by https://github.com/apache/spark/pull/29436.
### Does this PR introduce _any_ user-facing change?
Yes, fix a bug.
### How was this patch tested?
new test
Closes#29637 from cloud-fan/refactor.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
### What changes were proposed in this pull request?
This PR intends to fix a bug where references can be missing when adding aliases to widen data types in `WidenSetOperationTypes`. For example,
```
CREATE OR REPLACE TEMPORARY VIEW t3 AS VALUES (decimal(1)) tbl(v);
SELECT t.v FROM (
SELECT v FROM t3
UNION ALL
SELECT v + v AS v FROM t3
) t;
org.apache.spark.sql.AnalysisException: Resolved attribute(s) v#1 missing from v#3 in operator !Project [v#1]. Attribute(s) with the same name appear in the operation: v. Please check if the right attribute(s) are used.;;
!Project [v#1] <------ the reference got missing
+- SubqueryAlias t
+- Union
:- Project [cast(v#1 as decimal(11,0)) AS v#3]
: +- Project [v#1]
: +- SubqueryAlias t3
: +- SubqueryAlias tbl
: +- LocalRelation [v#1]
+- Project [v#2]
+- Project [CheckOverflow((promote_precision(cast(v#1 as decimal(11,0))) + promote_precision(cast(v#1 as decimal(11,0)))), DecimalType(11,0), true) AS v#2]
+- SubqueryAlias t3
+- SubqueryAlias tbl
+- LocalRelation [v#1]
```
In the case, `WidenSetOperationTypes` added the alias `cast(v#1 as decimal(11,0)) AS v#3`, then the reference in the top `Project` got missing. This PR correct the reference (`exprId` and widen `dataType`) after adding aliases in the rule.
### Why are the changes needed?
bugfixes
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added unit tests
Closes#29485 from maropu/SPARK-32638.
Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
LeftSemi and Existence SortMergeJoin should not buffer all matching right side rows when bound condition is empty, this is unnecessary and can lead to performance degradation especially when spilling happens.
### Why are the changes needed?
Performance improvement.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New UT and TPCDS benchmarks.
Closes#29572 from peter-toth/SPARK-32730-improve-leftsemi-sortmergejoin.
Authored-by: Peter Toth <peter.toth@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This PR changes `AttributeSet` and `ExpressionSet` to maintain the insertion order of the elements. More specifically, we:
- change the underlying data structure of `AttributeSet` from `HashSet` to `LinkedHashSet` to maintain the insertion order.
- `ExpressionSet` already uses a list to keep track of the expressions, however, since it is extending Scala's immutable.Set class, operations such as map and flatMap are delegated to the immutable.Set itself. This means that the result of these operations is not an instance of ExpressionSet anymore, rather it's a implementation picked up by the parent class. We also remove this inheritance from `immutable.Set `and implement the needed methods directly. ExpressionSet has a very specific semantics and it does not make sense to extend `immutable.Set` anyway.
- change the `PlanStabilitySuite` to not sort the attributes, to be able to catch changes in the order of expressions in different runs.
### Why are the changes needed?
Expressions identity is based on the `ExprId` which is an auto-incremented number. This means that the same query can yield a query plan with different expression ids in different runs. `AttributeSet` and `ExpressionSet` internally use a `HashSet` as the underlying data structure, and therefore cannot guarantee the a fixed order of operations in different runs. This can be problematic in cases we like to check for plan changes in different runs.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Passes `PlanStabilitySuite` after regenerating the golden files.
Closes#29598 from dbaliafroozeh/FixOrderOfExpressions.
Authored-by: Ali Afroozeh <ali.afroozeh@databricks.com>
Signed-off-by: herman <herman@databricks.com>
### What changes were proposed in this pull request?
Move StreamingRelationV2 to the catalyst module and bind with the Table interface.
### Why are the changes needed?
Currently, the StreamingRelationV2 is bind with TableProvider. Since the V2 relation is not bound with `DataSource`, to make it more flexible and have better expansibility, it should be moved to the catalyst module and bound with the Table interface. We did a similar thing for DataSourceV2Relation.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing UT.
Closes#29633 from xuanyuanking/SPARK-32782.
Authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This PR fails the interval values parsing when they contain non-ASCII characters which are silently omitted right now.
e.g. the case below should be invalid
```
select interval 'interval中文 1 day'
```
### Why are the changes needed?
bugfix, intervals should fail when containing invalid characters
### Does this PR introduce _any_ user-facing change?
yes,
#### before
select interval 'interval中文 1 day' results 1 day, now it fails with
```
org.apache.spark.sql.catalyst.parser.ParseException
Cannot parse the INTERVAL value: interval中文 1 day
```
### How was this patch tested?
new tests
Closes#29632 from yaooqinn/SPARK-32781.
Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This PR reduces log messages for spark-sql CLI like spark-shell and pyspark CLI.
### Why are the changes needed?
When we launch spark-sql CLI, too many log messages are shown and it's sometimes difficult to find the result of query.
```
spark-sql> SELECT now();
20/09/02 00:11:45 INFO CodeGenerator: Code generated in 10.121625 ms
20/09/02 00:11:45 INFO SparkContext: Starting job: main at NativeMethodAccessorImpl.java:0
20/09/02 00:11:45 INFO DAGScheduler: Got job 0 (main at NativeMethodAccessorImpl.java:0) with 1 output partitions
20/09/02 00:11:45 INFO DAGScheduler: Final stage: ResultStage 0 (main at NativeMethodAccessorImpl.java:0)
20/09/02 00:11:45 INFO DAGScheduler: Parents of final stage: List()
20/09/02 00:11:45 INFO DAGScheduler: Missing parents: List()
20/09/02 00:11:45 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[2] at main at NativeMethodAccessorImpl.java:0), which has no missing parents
20/09/02 00:11:45 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 6.3 KiB, free 366.3 MiB)
20/09/02 00:11:45 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 3.2 KiB, free 366.3 MiB)
20/09/02 00:11:45 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.1.204:42615 (size: 3.2 KiB, free: 366.3 MiB)
20/09/02 00:11:45 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1348
20/09/02 00:11:45 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[2] at main at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0))
20/09/02 00:11:45 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks resource profile 0
20/09/02 00:11:45 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0) (192.168.1.204, executor driver, partition 0, PROCESS_LOCAL, 7561 bytes) taskResourceAssignments Map()
20/09/02 00:11:45 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
20/09/02 00:11:45 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1446 bytes result sent to driver
20/09/02 00:11:45 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 238 ms on 192.168.1.204 (executor driver) (1/1)
20/09/02 00:11:45 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
20/09/02 00:11:45 INFO DAGScheduler: ResultStage 0 (main at NativeMethodAccessorImpl.java:0) finished in 0.343 s
20/09/02 00:11:45 INFO DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job
20/09/02 00:11:45 INFO TaskSchedulerImpl: Killing all running tasks in stage 0: Stage finished
20/09/02 00:11:45 INFO DAGScheduler: Job 0 finished: main at NativeMethodAccessorImpl.java:0, took 0.377489 s
2020-09-02 00:11:45.07
Time taken: 0.704 seconds, Fetched 1 row(s)
20/09/02 00:11:45 INFO SparkSQLCLIDriver: Time taken: 0.704 seconds, Fetched 1 row(s)
```
### Does this PR introduce _any_ user-facing change?
Yes. Log messages are reduced for spark-sql CLI like as follows.
```
20/09/02 00:34:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
20/09/02 00:34:53 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
20/09/02 00:34:53 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
20/09/02 00:34:55 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
20/09/02 00:34:55 WARN ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore kou192.168.1.204
Spark master: local[*], Application Id: local-1598974492822
spark-sql> SELECT now();
2020-09-02 00:35:05.258
Time taken: 2.299 seconds, Fetched 1 row(s)
```
### How was this patch tested?
Launched spark-sql CLI and confirmed that log messages are reduced as I paste above.
Closes#29619 from sarutak/suppress-log-for-spark-sql.
Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
This PR adds support to read host-local shuffle data from disk directly when external shuffle service is disabled.
Similar to #25299, we first try to get local disk directories for the shuffle data, which is located at the same host with the current executor. The only difference is, in #25299, it gets the directories from the external shuffle service while in this PR, it gets the directory from the executors.
To implement the feature, this PR extends the `HostLocalDirManager ` for both `ExternalBlockStoreClient` and `NettyBlockTransferService`. Also, this PR adds `getHostLocalDirs` for `NettyBlockTransferService` as `ExternalBlockStoreClient` does, in order to send the get-dir-request to the corresponding executor. And this PR resued the request message`GetLocalDirsForExecutors` for simple.
### Why are the changes needed?
After SPARK-27651 / #25299, Spark can read host-local shuffle data directly from disk when external shuffle service is enabled. To extend the future, we can also support it when the external shuffle service is disabled.
### Does this PR introduce _any_ user-facing change?
Yes. Before this PR, to use the host-local shuffle reading feature, users should not only enable `spark.shuffle.readHostLocalDisk` but also `spark.shuffle.service.enabled`. After this PR, enable `spark.shuffle.readHostLocalDisk` should be enough, and external shuffle service is no longer a pre-requirement.
### How was this patch tested?
Added test and tested manually.
Closes#28911 from Ngone51/support_node_local_shuffle.
Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
Struct field both in GROUP BY and Aggregate Expresison with CUBE/ROLLUP/GROUPING SET will failed when analysis.
```
test("SPARK-31670") {
withTable("t1") {
sql(
"""
|CREATE TEMPORARY VIEW t(a, b, c) AS
|SELECT * FROM VALUES
|('A', 1, NAMED_STRUCT('row_id', 1, 'json_string', '{"i": 1}')),
|('A', 2, NAMED_STRUCT('row_id', 2, 'json_string', '{"i": 1}')),
|('A', 2, NAMED_STRUCT('row_id', 2, 'json_string', '{"i": 2}')),
|('B', 1, NAMED_STRUCT('row_id', 3, 'json_string', '{"i": 1}')),
|('C', 3, NAMED_STRUCT('row_id', 4, 'json_string', '{"i": 1}'))
""".stripMargin)
checkAnswer(
sql(
"""
|SELECT a, c.json_string, SUM(b)
|FROM t
|GROUP BY a, c.json_string
|WITH CUBE
|""".stripMargin),
Row("A", "{\"i\": 1}", 3) :: Row("A", "{\"i\": 2}", 2) :: Row("A", null, 5) ::
Row("B", "{\"i\": 1}", 1) :: Row("B", null, 1) ::
Row("C", "{\"i\": 1}", 3) :: Row("C", null, 3) ::
Row(null, "{\"i\": 1}", 7) :: Row(null, "{\"i\": 2}", 2) :: Row(null, null, 9) :: Nil)
}
}
```
Error
```
[info] - SPARK-31670 *** FAILED *** (2 seconds, 857 milliseconds)
[info] Failed to analyze query: org.apache.spark.sql.AnalysisException: expression 't.`c`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;;
[info] Aggregate [a#247, json_string#248, spark_grouping_id#246L], [a#247, c#223.json_string AS json_string#241, sum(cast(b#222 as bigint)) AS sum(b)#243L]
[info] +- Expand [List(a#221, b#222, c#223, a#244, json_string#245, 0), List(a#221, b#222, c#223, a#244, null, 1), List(a#221, b#222, c#223, null, json_string#245, 2), List(a#221, b#222, c#223, null, null, 3)], [a#221, b#222, c#223, a#247, json_string#248, spark_grouping_id#246L]
[info] +- Project [a#221, b#222, c#223, a#221 AS a#244, c#223.json_string AS json_string#245]
[info] +- SubqueryAlias t
[info] +- Project [col1#218 AS a#221, col2#219 AS b#222, col3#220 AS c#223]
[info] +- Project [col1#218, col2#219, col3#220]
[info] +- LocalRelation [col1#218, col2#219, col3#220]
[info]
```
For Struct type Field, when we resolve it, it will construct with Alias. When struct field in GROUP BY with CUBE/ROLLUP etc, struct field in groupByExpression and aggregateExpression will be resolved with different exprId as below
```
'Aggregate [cube(a#221, c#223.json_string AS json_string#240)], [a#221, c#223.json_string AS json_string#241, sum(cast(b#222 as bigint)) AS sum(b)#243L]
+- SubqueryAlias t
+- Project [col1#218 AS a#221, col2#219 AS b#222, col3#220 AS c#223]
+- Project [col1#218, col2#219, col3#220]
+- LocalRelation [col1#218, col2#219, col3#220]
```
This makes `ResolveGroupingAnalytics.constructAggregateExprs()` failed to replace aggreagteExpression use expand groupByExpression attribute since there exprId is not same. then error happened.
### Why are the changes needed?
Fix analyze bug
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
Added UT
Closes#28490 from AngersZhuuuu/SPARK-31670.
Lead-authored-by: angerszhu <angers.zhu@gmail.com>
Co-authored-by: AngersZhuuuu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Currently in DPP, left semi can only prune left, this pr makes it also support prune right.
### Why are the changes needed?
A minor improvement for DPP.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Add a test case.
Closes#29582 from wzhfy/dpp_support_leftsemi_pruneRight.
Authored-by: Zhenhua Wang <wzh_zju@163.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
### What changes were proposed in this pull request?
This is a follow-up of #29160. This allows Spark SQL project to compile for Scala 2.13.
### Why are the changes needed?
It's needed for #28545
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
I compiled with Scala 2.13. It fails in `Spark REPL` project, which will be fixed by #28545Closes#29584 from karolchmist/SPARK-32364-scala-2.13.
Authored-by: Karol Chmist <info+github@chmist.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
### What changes were proposed in this pull request?
#29129 duplicated GetCatalogsOperationMock in the hive-version-specific subdirectories, otherwise profile hive-1.2 would not compile. We can prevent duplication of this class by shimming the required hive-version-specific types.
### Why are the changes needed?
This is a cleanup to avoid duplication of a mock class.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
This patch only changes tests.
Closes#29549 from alismess-db/get-catalogs-operation-mock-use-shim.
Authored-by: Ali Smesseim <ali.smesseim@databricks.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>