### What changes were proposed in this pull request?
For a specific stage, it is useful to show the task metrics in percentile distribution. This information can help users know whether or not there is a skew/bottleneck among tasks in a given stage. We list an example in taskMetricsDistributions.json
Similarly, it is useful to show the executor metrics in percentile distribution for a specific stage. This information can show whether or not there is a skewed load on some executors. We list an example in executorMetricsDistributions.json
We define `withSummaries` and `quantiles` query parameter in the REST API for a specific stage as:
applications/<application_id>/<application_attempt/stages/<stage_id>/<stage_attempt>?withSummaries=[true|false]& quantiles=0.05,0.25,0.5,0.75,0.95
1. withSummaries: default is false, define whether to show current stage's taskMetricsDistribution and executorMetricsDistribution
2. quantiles: default is `0.0,0.25,0.5,0.75,1.0` only effect when `withSummaries=true`, it define the quantiles we use when calculating metrics distributions.
When withSummaries=true, both task metrics in percentile distribution and executor metrics in percentile distribution are included in the REST API output. The default value of withSummaries is false, i.e. no metrics percentile distribution will be included in the REST API output.
### Why are the changes needed?
For a specific stage, it is useful to show the task metrics in percentile distribution. This information can help users know whether or not there is a skew/bottleneck among tasks in a given stage. We list an example in taskMetricsDistributions.json
### Does this PR introduce _any_ user-facing change?
User can use below restful API to get task metrics distribution and executor metrics distribution for indivial stage
```
applications/<application_id>/<application_attempt/stages/<stage_id>/<stage_attempt>?withSummaries=[true|false]
```
### How was this patch tested?
Added UT
Closes#31611 from AngersZhuuuu/SPARK-34488.
Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
### What changes were proposed in this pull request?
This patch adds a config `spark.yarn.kerberos.renewal.excludeHadoopFileSystems` which lists the filesystems to be excluded from delegation token renewal at YARN.
### Why are the changes needed?
MapReduce jobs can instruct YARN to skip renewal of tokens obtained from certain hosts by specifying the hosts with configuration mapreduce.job.hdfs-servers.token-renewal.exclude=<host1>,<host2>,..,<hostN>.
But seems Spark lacks of similar option. So the job submission fails if YARN fails to renew DelegationToken for any of the remote HDFS cluster. The failure in DT renewal can happen due to many reason like Remote HDFS does not trust Kerberos identity of YARN etc. We have a customer facing such issue.
### Does this PR introduce _any_ user-facing change?
No, if the config is not set. Yes, as users can use this config to instruct YARN not to renew delegation token from certain filesystems.
### How was this patch tested?
It is hard to do unit test for this. We did verify it work from the customer using this fix in the production environment.
Closes#31761 from viirya/SPARK-34295.
Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
### What changes were proposed in this pull request?
1) Modify `GenericAvroSerializer` to support serialization of any `GenericContainer`
2) Register `KryoSerializer`s for `GenericData.{Array, EnumSymbol, Fixed}` using the modified `GenericAvroSerializer`
### Why are the changes needed?
Without this change, Kryo throws NPEs when trying to serialize `GenericData.{Array, EnumSymbol, Fixed}`. More details in SPARK-34477 Jira
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added unit tests for testing roundtrip serialization and deserialization of `GenericData.{Array, EnumSymbol, Fixed}` using `GenericAvroSerializer` directly and also indirectly through `KryoSerializer`
Closes#31597 from shardulm94/avro-array-serializer.
Authored-by: Shardul Mahadik <smahadik@linkedin.com>
Signed-off-by: Gengliang Wang <ltnwgl@gmail.com>
### What changes were proposed in this pull request?
SPARK-32160 add a config(`EXECUTOR_ALLOW_SPARK_CONTEXT`) to switch allow/disallow to create `SparkContext` in executors and the default value of the config is `false`
`ExternalAppendOnlyUnsafeRowArrayBenchmark` will run fail when `EXECUTOR_ALLOW_SPARK_CONTEXT` use the default value because the `ExternalAppendOnlyUnsafeRowArrayBenchmark#withFakeTaskContext` method try to create a `SparkContext` manually in Executor Side.
So the main change of this pr is set `EXECUTOR_ALLOW_SPARK_CONTEXT` to `true` to ensure `ExternalAppendOnlyUnsafeRowArrayBenchmark` run successfully.
### Why are the changes needed?
Bug fix.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manual test:
```
bin/spark-submit --class org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArrayBenchmark --jars spark-core_2.12-3.2.0-SNAPSHOT-tests.jar spark-sql_2.12-3.2.0-SNAPSHOT-tests.jar
```
**Before**
```
Exception in thread "main" java.lang.IllegalStateException: SparkContext should only be created and accessed on the driver.
at org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$assertOnDriver(SparkContext.scala:2679)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:89)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:137)
at org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArrayBenchmark$.withFakeTaskContext(ExternalAppendOnlyUnsafeRowArrayBenchmark.scala:52)
at org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArrayBenchmark$.testAgainstRawArrayBuffer(ExternalAppendOnlyUnsafeRowArrayBenchmark.scala:119)
at org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArrayBenchmark$.$anonfun$runBenchmarkSuite$1(ExternalAppendOnlyUnsafeRowArrayBenchmark.scala:189)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.benchmark.BenchmarkBase.runBenchmark(BenchmarkBase.scala:40)
at org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArrayBenchmark$.runBenchmarkSuite(ExternalAppendOnlyUnsafeRowArrayBenchmark.scala:186)
at org.apache.spark.benchmark.BenchmarkBase.main(BenchmarkBase.scala:58)
at org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArrayBenchmark.main(ExternalAppendOnlyUnsafeRowArrayBenchmark.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1030)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1039)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
```
**After**
`ExternalAppendOnlyUnsafeRowArrayBenchmark` run successfully.
Closes#31939 from LuciferYang/SPARK-34832.
Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This PR fixes an issue that `col()`, `$"<name>"` and `df("name")` don't handle quoted column names like ``` `a``b.c` ```properly.
For example, if we have a following DataFrame.
```
val df1 = spark.sql("SELECT 'col1' AS `a``b.c`")
```
For the DataFrame, this query is successfully executed.
```
scala> df1.selectExpr("`a``b.c`").show
+-----+
|a`b.c|
+-----+
| col1|
+-----+
```
But the following query will fail because ``` df1("`a``b.c`") ``` throws an exception.
```
scala> df1.select(df1("`a``b.c`")).show
org.apache.spark.sql.AnalysisException: syntax error in attribute name: `a``b.c`;
at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute$.e$1(unresolved.scala:152)
at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute$.parseAttributeName(unresolved.scala:162)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveQuoted(LogicalPlan.scala:121)
at org.apache.spark.sql.Dataset.resolve(Dataset.scala:221)
at org.apache.spark.sql.Dataset.col(Dataset.scala:1274)
at org.apache.spark.sql.Dataset.apply(Dataset.scala:1241)
... 49 elided
```
### Why are the changes needed?
It's a bug.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New tests.
Closes#31854 from sarutak/fix-parseAttributeName.
Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Support submit to k8s only with token.
### Why are the changes needed?
Now, sumbit to k8s always need oauth files.
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Before, submit job out of k8s cluster without correct ca.crt, we may get this exception:
```
Caused by: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:439)
at sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:306)
at sun.security.validator.Validator.validate(Validator.java:271)
at sun.security.ssl.X509TrustManagerImpl.validate(X509TrustManagerImpl.java:312)
```
When set spark.kubernetes.trust.certificates = true, we can submit only with correct token, no need to config ca.crt in local env.
Submit as:
```
bin/spark-submit \
--master $master \
--name pi \
--deploy-mode cluster \
--conf spark.kubernetes.container.image=$image \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.authenticate.submission.oauthToken=$clusterToken \
--conf spark.kubernetes.trust.certificates=true \
local:///opt/spark/examples/src/main/python/pi.py 200
```
Closes#30684 from hddong/trust-certs.
Authored-by: hongdongdong <hongdongdong@cmss.chinamobile.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
SPARK-34842 (#31012) has a typo in the type of `date_dim.d_quarter_name` in the TPCDS schema (`TPCDSBase`). This PR replace `CHAR(1)` with `CHAR(6)`. This fix comes from p28 in [the TPCDS official doc](http://www.tpc.org/tpc_documents_current_versions/pdf/tpc-ds_v2.9.0.pdf).
### Why are the changes needed?
Bugfix.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
N/A
Closes#31943 from maropu/SPARK-34083-FOLLOWUP.
Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
1. Add new expression `MultiplyYMInterval` which multiplies a `YearMonthIntervalType` expression by a `NumericType` expression including ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, DecimalType.
2. Extend binary arithmetic rules to support `numeric * year-month interval` and `year-month interval * numeric`.
### Why are the changes needed?
To conform the ANSI SQL standard which requires such operation over year-month intervals:
<img width="667" alt="Screenshot 2021-03-22 at 16 33 16" src="https://user-images.githubusercontent.com/1580697/111997810-77d1eb80-8b2c-11eb-951d-e43911d9c5db.png">
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
By running new tests:
```
$ build/sbt "test:testOnly *IntervalExpressionsSuite"
$ build/sbt "test:testOnly *ColumnExpressionSuite"
```
Closes#31929 from MaxGekk/interval-mul-div.
Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
### What changes were proposed in this pull request?
the given example uses a non-standard syntax for CREATE TABLE, by defining the partitioning column with the other columns, instead of in PARTITION BY.
This works is this case, because the partitioning column happens to be the last column defined, but it will break if instead 'name' would be used for partitioning.
I suggest therefore to change the example to use a standard syntax, like in
https://spark.apache.org/docs/3.1.1/sql-ref-syntax-ddl-create-table-hiveformat.html
### Why are the changes needed?
To show the better documentation.
### Does this PR introduce _any_ user-facing change?
Yes, this fixes the user-facing docs.
### How was this patch tested?
CI should test it out.
Closes#31900 from robert4os/patch-1.
Authored-by: robert4os <robert4os@users.noreply.github.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
forward-port https://github.com/apache/spark/pull/31811 to master
### What changes were proposed in this pull request?
For permanent views (and the new SQL temp view in Spark 3.1), we store the view SQL text and re-parse/analyze the view SQL text when reading the view. In the case of `SELECT * FROM ...`, we want to avoid view schema change (e.g. the referenced table changes its schema) and will record the view query output column names when creating the view, so that when reading the view we can add a `SELECT recorded_column_names FROM ...` to retain the original view query schema.
In Spark 3.1 and before, the final SELECT is added after the analysis phase: https://github.com/apache/spark/blob/branch-3.1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala#L67
If the view query has duplicated output column names, we always pick the first column when reading a view. A simple repro:
```
scala> sql("create view c(x, y) as select 1 a, 2 a")
res0: org.apache.spark.sql.DataFrame = []
scala> sql("select * from c").show
+---+---+
| x| y|
+---+---+
| 1| 1|
+---+---+
```
In the master branch, we will fail at the view reading time due to b891862fb6 , which adds the final SELECT during analysis, so that the query fails with `Reference 'a' is ambiguous`
This PR proposes to resolve the view query output column names from the matching attributes by ordinal.
For example, `create view c(x, y) as select 1 a, 2 a`, the view query output column names are `[a, a]`. When we reading the view, there are 2 matching attributes (e.g.`[a#1, a#2]`) and we can simply match them by ordinal.
A negative example is
```
create table t(a int)
create view v as select *, 1 as col from t
replace table t(a int, col int)
```
When reading the view, the view query output column names are `[a, col]`, and there are two matching attributes of `col`, and we should fail the query. See the tests for details.
### Why are the changes needed?
bug fix
### Does this PR introduce _any_ user-facing change?
yes
### How was this patch tested?
new test
Closes#31930 from cloud-fan/view2.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This patch proposes to add a few public API change to DS v2, to make DS v2 scan can report metrics to Spark.
Two public interfaces are added.
* `CustomMetric`: metric interface at the driver side. It basically defines how Spark aggregates task metrics with the same metric name.
* `CustomTaskMetric`: task metric reported at executors. It includes a name and long value. Spark will collect these metric values and update internal metrics.
There are two public methods added to existing public interfaces. They are optional to DS v2 implementations.
* `PartitionReader.currentMetricsValues()`: returns an array of CustomTaskMetric. Here is where the actual metrics values are collected. Empty array by default.
* `Scan.supportedCustomMetrics()`: returns an array of supported custom metrics `CustomMetric`. Empty array by default.
### Why are the changes needed?
In order to report custom metrics, we need some public API change in DS v2 to make it possible.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
This only adds interfaces. In follow-up PRs where adding implementation there will be tests added. See #31451 and #31398 for some details and manual test there.
Closes#31476 from viirya/SPARK-34366.
Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This bug was introduced by SPARK-30428 at Apache Spark 3.0.0.
This PR fixes `FileScan.equals()`.
### Why are the changes needed?
- Without this fix `FileScan.equals` doesn't take `fileIndex` and `readSchema` into account.
- Partition filters and data filters added to `FileScan` (in #27112 and #27157) caused that canonicalized form of some `BatchScanExec` nodes don't match and this prevents some reuse possibilities.
### Does this PR introduce _any_ user-facing change?
Yes, before this fix incorrect reuse of `FileScan` and so `BatchScanExec` could have happed causing correctness issues.
### How was this patch tested?
Added new UTs.
Closes#31848 from peter-toth/SPARK-34756-fix-filescan-equality-check.
Authored-by: Peter Toth <peter.toth@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Move `ExecutionListenerBus` register (both `ListenerBus` and `ContextCleaner` register) into itself.
Also with a minor change that put `registerSparkListenerForCleanup` to a better place.
### Why are the changes needed?
improve code
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass existing tests.
Closes#31919 from Ngone51/SPARK-34087-followup.
Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Current link for `Azure Blob Storage and Azure Datalake Gen 2` leads to AWS information. Replacing the link to point to the right page.
### Why are the changes needed?
For users to access to the correct link.
### Does this PR introduce _any_ user-facing change?
Yes, it fixes the link correctly.
### How was this patch tested?
N/A
Closes#31938 from lenadroid/patch-1.
Authored-by: Lena <alehall@microsoft.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
### What changes were proposed in this pull request?
This patch proposes to disable fetching shuffle blocks in batch when io encryption is enabled. Adaptive Query Execution fetch contiguous shuffle blocks for the same map task in batch to reduce IO and improve performance. However, we found that batch fetching is incompatible with io encryption.
### Why are the changes needed?
Before this patch, we set `spark.io.encryption.enabled` to true, then run some queries which coalesced partitions by AEQ, may got following error message:
```14:05:52.638 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 1.0 in stage 2.0 (TID 3) (11.240.37.88 executor driver): FetchFailed(BlockManagerId(driver, 11.240.37.88, 63574, None), shuffleId=0, mapIndex=0, mapId=0, reduceId=2, message=
org.apache.spark.shuffle.FetchFailedException: Stream is corrupted
at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:772)
at org.apache.spark.storage.BufferReleasingInputStream.read(ShuffleBlockFetcherIterator.scala:845)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.readSize(UnsafeRowSerializer.scala:113)
at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:129)
at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:110)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:494)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1437)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Stream is corrupted
at net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:200)
at net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:226)
at net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:157)
at org.apache.spark.storage.BufferReleasingInputStream.read(ShuffleBlockFetcherIterator.scala:841)
... 25 more
)
```
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
New tests.
Closes#31898 from hezuojiao/fetch_shuffle_in_batch.
Authored-by: hezuojiao <hezuojiao@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
We added the gnupg installation in https://github.com/apache/spark/pull/30130 , we should do apt update before gnupg isntallation, otherwise we will get a fetch error when package is updated.
See more in:
[1] http://apache-spark-developers-list.1001551.n3.nabble.com/K8s-Integration-test-is-unable-to-run-because-of-the-unavailable-libs-td30986.html
### Why are the changes needed?
add a apt-update cmd before gnupg installation to avoid invaild package cache list.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
K8s Integration test passed
Closes#31923 from Yikun/SPARK-34820.
Authored-by: Yikun Jiang <yikunkero@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
Remove the unused variable 'secMgr' in SparkSubmit.scala and DriverWrapper.scala
In jira https://issues.apache.org/jira/browse/SPARK-33925, The last usage of SecurityManager in Utils.fetchFile was removed. We don't need the variable anymore
### Why are the changes needed?
For better readablity of codes
### Does this PR introduce _any_ user-facing change?
No,dev-only
### How was this patch tested?
Manually complied. Github Actions and Jenkins build should test it out as well.
Closes#31928 from Peng-Lei/rm_secMgr.
Authored-by: PengLei <18066542445@189.cn>
Signed-off-by: Sean Owen <srowen@gmail.com>
### What changes were proposed in this pull request?
Marked `RowNumberLike` and `RankLike` as not-nullable.
### Why are the changes needed?
`RowNumberLike` and `RankLike` SQL expressions never return null value. Marking them as non-nullable can have some performance benefits, because some optimizer rules apply only to non-nullable expressions
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Did not find any existing tests on the nullability of aggregate functions.
Plan stability suite partially covers this.
Closes#31924 from tanelk/SPARK-34812_nullability.
Authored-by: tanel.kiis@gmail.com <tanel.kiis@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Pass the raised `ImportError` on failing to import pandas/pyarrow. This will help the user identify whether pandas/pyarrow are indeed not in the environment or if they threw a different `ImportError`.
### Why are the changes needed?
This can already happen in Pandas for example where it could throw an `ImportError` on its initialisation path if `dateutil` doesn't satisfy a certain version requirement https://github.com/pandas-dev/pandas/blob/0.24.x/pandas/compat/__init__.py#L438
### Does this PR introduce _any_ user-facing change?
Yes, it will now show the root cause of the exception when pandas or arrow is missing during import.
### How was this patch tested?
Manually tested.
```python
from pyspark.sql.functions import pandas_udf
spark.range(1).select(pandas_udf(lambda x: x))
```
Before:
```
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/...//spark/python/pyspark/sql/pandas/functions.py", line 332, in pandas_udf
require_minimum_pyarrow_version()
File "/.../spark/python/pyspark/sql/pandas/utils.py", line 53, in require_minimum_pyarrow_version
raise ImportError("PyArrow >= %s must be installed; however, "
ImportError: PyArrow >= 1.0.0 must be installed; however, it was not found.
```
After:
```
Traceback (most recent call last):
File "/.../spark/python/pyspark/sql/pandas/utils.py", line 49, in require_minimum_pyarrow_version
import pyarrow
ModuleNotFoundError: No module named 'pyarrow'
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/.../spark/python/pyspark/sql/pandas/functions.py", line 332, in pandas_udf
require_minimum_pyarrow_version()
File "/.../spark/python/pyspark/sql/pandas/utils.py", line 55, in require_minimum_pyarrow_version
raise ImportError("PyArrow >= %s must be installed; however, "
ImportError: PyArrow >= 1.0.0 must be installed; however, it was not found.
```
Closes#31902 from johnhany97/jayad/spark-34803.
Lead-authored-by: John Ayad <johnhany97@gmail.com>
Co-authored-by: John H. Ayad <johnhany97@gmail.com>
Co-authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Update the Avro version to 1.10.2
### Why are the changes needed?
To stay up to date with upstream and catch compatibility issues with zstd
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit tests
Closes#31866 from iemejia/SPARK-27733-upgrade-avro-1.10.2.
Authored-by: Ismaël Mejía <iemejia@gmail.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
### What changes were proposed in this pull request?
Use fine-grained lock in SessionCatalog.tableExists, in order to lock currentDB variable rather than lock `tableExists` method which will block inner external catalog's behaviour.
### Why are the changes needed?
We have modified the underlying hive meta store which a different hive database is placed in its own shard for performance. However, we found that the synchronized lock limits the concurrency.
### How was this patch tested?
Existing tests.
Closes#31891 from woyumen4597/SPARK-34800.
Authored-by: woyumen4597 <woyumen4597@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Now that all the temporary views are wrapped with `TemporaryViewRelation`(#31273, #31652, and #31825), this PR proposes to update `SessionCatalog`'s APIs for temporary views to take or return more concrete types.
APIs that will take `TemporaryViewRelation` instead of `LogicalPlan`:
```
createTempView, createGlobalTempView, alterTempViewDefinition
```
APIs that will return `TemporaryViewRelation` instead of `LogicalPlan`:
```
getRawTempView, getRawGlobalTempView
```
APIs that will return `View` instead of `LogicalPlan`:
```
getTempView, getGlobalTempView, lookupTempView
```
### Why are the changes needed?
Internal refactoring to work with more concrete types.
### Does this PR introduce _any_ user-facing change?
No, this is internal refactoring.
### How was this patch tested?
Updated existing tests affected by the refactoring.
Closes#31906 from imback82/use_temporary_view_relation.
Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This PR fixes the `HybridRowQueue ` to respect the configured memory mode.
Besides, this PR also refactored the constructor of `MemoryConsumer` to accept the memory mode explicitly.
### Why are the changes needed?
`HybridRowQueue` supports both onHeap and offHeap manipulation. But it inherited the wrong `MemoryConsumer` constructor, which hard-coded the memory mode to `onHeap`.
### Does this PR introduce _any_ user-facing change?
No. (Maybe yes in some cases where users can't complete the job before could complete successfully after the fix because of `HybridRowQueue` is able to spill under offHeap mode now. )
### How was this patch tested?
Updated the existing test to make it test both offHeap and onHeap modes.
Closes#31152 from Ngone51/fix-MemoryConsumer-memorymode.
Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This PR updates CSVBenchmark especially we have a fix like https://github.com/apache/spark/pull/31858 that could potentially improve the performance.
### Why are the changes needed?
To have the updated benchmark results.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manually ran the benchmark
Closes#31917 from HyukjinKwon/SPARK-34815.
Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
### What changes were proposed in this pull request?
This PR proposes to optimize WAL commit phase via following changes:
* cache offset log to avoid FS get operation per batch
* just directly delete instead of employing FS list operation on purge
### Why are the changes needed?
There're inefficiency on WAL commit phase which can be easily optimized via using a small driver memory.
1. To provide the offset metadata to source side (via `source.commit()`), we read offset metadata for previous batch from file system, which is probably written by this driver in previous batches. Caching it into driver memory would reduce the get operation.
2. Spark calls purge against offset log & commit log per batch, which calls list operation. If the previous batch succeeded to purge, the current batch just needs to check one batch which can be simply done via direct delete operation, instead of calling list operation.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manually tested with additional debug log. (Verified that cache is used, cache keeps the size as 2, only one delete call is used instead of list call)
Did some experiment with simple rate to console query. (NOTE: wasn't done with master branch - tested against Spark 2.4.x, but WAL commit phase hasn't been changed AFAIK during these versions)
AWS S3 + S3 guard:
> before the patch
<img width="1075" alt="aws-before" src="https://user-images.githubusercontent.com/1317309/107108721-6cc54380-687d-11eb-8f10-b906b9d58397.png">
> after the patch
<img width="1071" alt="aws-after" src="https://user-images.githubusercontent.com/1317309/107108724-7189f780-687d-11eb-88da-26912ac15c85.png">
Azure:
> before the patch
<img width="1074" alt="azure-before" src="https://user-images.githubusercontent.com/1317309/107108726-75b61500-687d-11eb-8c06-9048fa10ff9a.png">
> after the patch
<img width="1069" alt="azure-after" src="https://user-images.githubusercontent.com/1317309/107108729-79e23280-687d-11eb-8d97-e7f3aeec51be.png">
Closes#31495 from HeartSaVioR/SPARK-34383.
Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
### What changes were proposed in this pull request?
This PR proposes to reorder the items in User Guide in PySpark documentation in order to place general guides first and advance ones later.
### Why are the changes needed?
For users to more easily follow.
### Does this PR introduce _any_ user-facing change?
Yes, it changes the order in the items in documentation .
### How was this patch tested?
Manually verified the documentation after building:
<img width="768" alt="Screen Shot 2021-03-22 at 2 38 41 PM" src="https://user-images.githubusercontent.com/6477701/111945072-5537d680-8b1c-11eb-9f43-02f3ad63a509.png">
FWIW, the current page: https://spark.apache.org/docs/latest/api/python/user_guide/index.htmlCloses#31922 from HyukjinKwon/SPARK-34818.
Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
- Create a new rule `ResolveStreamWrite` for all analysis logic for streaming write.
- Add corresponding logical plans `WriteToStreamStatement` and `WriteToStream`.
### Why are the changes needed?
Currently, the analysis logic for streaming write is mixed in StreamingQueryManager. If we create a specific analyzer rule and separated logical plans, it should be helpful for further extension.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
Closes#31842 from xuanyuanking/SPARK-34748.
Authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This reverts commit 86ea520320.
### Why are the changes needed?
The test added in the change was flaky.
Closes#31918 from bozhang2820/revert-spark-34757.
Authored-by: Bo Zhang <bo.zhang@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This PR fixes an issue that `addFile` and `addJar` further encode even though a URI form string is passed.
For example, the following operation will throw exception even though the file exists.
```
sc.addFile("file:/foo/test%20file.txt")
```
Another case is `--files` and `--jars` option when we submit an application.
```
bin/spark-shell --files "/foo/test file.txt"
```
The path above is transformed to URI form [here](ecf4811764/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala (L400)) and passed to `addFile` so the same issue happens.
### Why are the changes needed?
This is a bug.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New test.
Closes#31718 from sarutak/fix-uri-encode-double.
Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Kousuke Saruta <sarutak@oss.nttdata.com>
### What changes were proposed in this pull request?
Consistently correct the spelling of `PushedFilters`
### Why are the changes needed?
bersprockets noted that it's wrong
### Does this PR introduce _any_ user-facing change?
Technically, I think it does. Practically, neither Google nor GitHub show anyone using `pushedFilers` outside of forks (or the discussion about fixing it started at https://github.com/apache/spark/pull/30323#issuecomment-725568719)
### How was this patch tested?
None beyond CI in the previous PR
Closes#30678 from jsoref/spelling-filters.
Authored-by: Josh Soref <jsoref@users.noreply.github.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
### What changes were proposed in this pull request?
1, add a new param `sorted` in `slice`;
2, in `VectorSlicer`, set `sorted = true` if input indices are ordered.
### Why are the changes needed?
The input indices of VectorSlicer are probably ordered.
VectorSlicer should use this attribute if possible.
I did a simple test and `sorted = true` maybe about 70% faster than existing `slice`
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
added testsuite
Closes#31588 from zhengruifeng/vector_slice_for_sorted_indices.
Authored-by: Ruifeng Zheng <ruifengz@foxmail.com>
Signed-off-by: Ruifeng Zheng <ruifengz@foxmail.com>
### What changes were proposed in this pull request?
This PR aims to disable a new test case using Hive 1.2.1 from Java9+ test environment.
### Why are the changes needed?
[HIVE-6113](https://issues.apache.org/jira/browse/HIVE-6113) upgraded Datanucleus to 4.x at Hive 2.0. Datanucleus 3.x doesn't support Java9+.
**Java 9+ Environment**
```
$ build/sbt "hive/testOnly *.HiveSparkSubmitSuite -- -z SPARK-34772" -Phive
...
[info] *** 1 TEST FAILED ***
[error] Failed: Total 1, Failed 1, Errors 0, Passed 0
[error] Failed tests:
[error] org.apache.spark.sql.hive.HiveSparkSubmitSuite
[error] (hive / Test / testOnly) sbt.TestsFailedException: Tests unsuccessful
[error] Total time: 328 s (05:28), completed Mar 21, 2021, 5:32:39 PM
```
### Does this PR introduce _any_ user-facing change?
Fix the UT in Java9+ environment.
### How was this patch tested?
Manually.
```
$ build/sbt "hive/testOnly *.HiveSparkSubmitSuite -- -z SPARK-34772" -Phive
...
[info] HiveSparkSubmitSuite:
[info] - SPARK-34772: RebaseDateTime loadRebaseRecords should use Spark classloader instead of context !!! CANCELED !!! (26 milliseconds)
[info] org.apache.commons.lang3.SystemUtils.isJavaVersionAtLeast(JAVA_9) was true (HiveSparkSubmitSuite.scala:344)
```
Closes#31916 from dongjoon-hyun/SPARK-HiveSparkSubmitSuite.
Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
This PR aims to enable `spark.hadoopRDD.ignoreEmptySplits` by default for Apache Spark 3.2.0.
### Why are the changes needed?
Although this is a safe improvement, this hasn't been enabled by default to avoid the explicit behavior change. This PR aims to switch the default explicitly in Apache Spark 3.2.0.
### Does this PR introduce _any_ user-facing change?
Yes, the behavior change is documented.
### How was this patch tested?
Pass the existing CIs.
Closes#31909 from dongjoon-hyun/SPARK-34809.
Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
Like we redact secrets and tokens, this PR aims to redact access key.
### Why are the changes needed?
Access key is also worth to hide.
### Does this PR introduce _any_ user-facing change?
This will hide this information from SparkUI (`Spark Properties` and `Hadoop Properties` and logs).
### How was this patch tested?
Pass the newly updated UT.
Closes#31912 from dongjoon-hyun/SPARK-34811.
Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
This pr upgrade Jackson to 2.12.2.
Jackson Release 2.12: https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.12
### Why are the changes needed?
Make it easy to upgrade Avro 1.10.2.
```
[error] Caused by: sbt.ForkMain$ForkError: com.fasterxml.jackson.databind.JsonMappingException: Scala module 2.11.4 requires Jackson Databind version >= 2.11.0 and < 2.12.0
[error] at com.fasterxml.jackson.module.scala.JacksonModule.setupModule(JacksonModule.scala:61)
[error] at com.fasterxml.jackson.module.scala.JacksonModule.setupModule$(JacksonModule.scala:46)
[error] at com.fasterxml.jackson.module.scala.DefaultScalaModule.setupModule(DefaultScalaModule.scala:17)
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Tested with Avro 1.10.2 and Parquet 1.12.0: https://github.com/apache/spark/runs/2157735537Closes#31878 from xclyfe/SPARK-34784.
Authored-by: Zhang, Xingchao <xingczhang@ebay.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
### What changes were proposed in this pull request?
This PR aims to update `PostgresIntegrationSuite` with the latest results.
### Why are the changes needed?
The latest PostgreSQL jar version is 42.2.19. Since 42.2.9, the test is broken because it returns `0.0` instead of `0.00`.
- https://jdbc.postgresql.org/documentation/changelog.html#version_42.2.19
42.2.9 (2019-12-06)
42.2.10 (2020-01-30)
42.2.11 (2020-03-09)
42.2.12 (2020-03-31)
42.2.13 (2020-06-04)
42.2.14 (2020-06-10)
42.2.15 (2020-08-14)
42.2.16 (2020-08-20)
42.2.17 (2020-10-09)
42.2.18 (2020-10-15)
42.2.19 (2021-02-18)
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass the CI with the updated test cases.
```
build/sbt -Pdocker-integration-tests 'docker-integration-tests/testOnly org.apache.spark.sql.jdbc.PostgresIntegrationSuite'
```
Closes#31910 from williamhyun/pg.
Authored-by: William Hyun <williamhyun3@gmail.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
### What changes were proposed in this pull request?
This PR fixes an incompatible behavior introduced by #31754.
The problem is that quoted name parts represented as a string are given to the constructor of `UnresolvedAttribute` which takes single string parameter, `sql` method invocation against the `UnresolvedAttrribute` returns different result than before.
One example is ``` UnresolvedAttribute("`a.b`").sql ```. This returned `a.b` before but it doesn't now.
See [this duscussion](https://github.com/apache/spark/pull/31754/files#r597181927) for more details.
### Why are the changes needed?
For compatibility.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New assertion.
Closes#31885 from sarutak/followup-SPARK-34636.
Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
Improve `PropagateEmptyRelation` to support join with false condition. For example:
```sql
SELECT * FROM t1 LEFT JOIN t2 ON false
```
Before this pr:
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastNestedLoopJoin BuildRight, LeftOuter, false
:- FileScan parquet default.t1[a#4L]
+- BroadcastExchange IdentityBroadcastMode, [id=#40]
+- FileScan parquet default.t2[b#5L]
```
After this pr:
```
== Physical Plan ==
*(1) Project [a#4L, null AS b#5L]
+- *(1) ColumnarToRow
+- FileScan parquet default.t1[a#4L]
```
### Why are the changes needed?
Avoid `BroadcastNestedLoopJoin` to improve query performance.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit test.
Closes#31857 from wangyum/SPARK-28220.
Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
### What changes were proposed in this pull request?
Document `mode` as a supported Imputer strategy in Pyspark docs.
### Why are the changes needed?
Support was added in 3.1, and documented in Scala, but some Python docs were missed.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing tests.
Closes#31883 from srowen/ImputerModeDocs.
Authored-by: Sean Owen <srowen@gmail.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
### What changes were proposed in this pull request?
Since Spark 3.0, the `libthrift` has been bumped up from 0.9.3 to 0.12.0.
Due to THRIFT-4805, The SparkThrift Server will print annoying TExceptions. For example, the current thrift server module test in Github action workflow outputs more than 200MB of data for this error only
```java
org.apache.thrift.transport.TTransportException
at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
at org.apache.thrift.transport.TSaslTransport.readLength(TSaslTransport.java:374)
at org.apache.thrift.transport.TSaslTransport.readFrame(TSaslTransport.java:451)
at org.apache.thrift.transport.TSaslTransport.read(TSaslTransport.java:433)
at org.apache.thrift.transport.TSaslServerTransport.read(TSaslServerTransport.java:43)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
at org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:425)
at org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:321)
at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:225)
at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:27)
at org.apache.hive.service.auth.TSetIpAddressProcessor.process(TSetIpAddressProcessor.java:53)
at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:310)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```
I checked the latest `hive-service-rpc` module in the maven center, https://mvnrepository.com/artifact/org.apache.hive/hive-service-rpc/3.1.2. It still uses the 0.9.3 version.
Unfortunately, I tried the newly released `libthrift 0.14.1`(w/o shading it), it breaks the metastore client side.
```scala
java.lang.NoSuchMethodError: org.apache.thrift.transport.TSocket.<init>(Ljava/lang/String;II)V
```
On the Thrift side, they just muted it see https://issues.apache.org/jira/browse/THRIFT-4805
So in this PR, I add a filter to suppress the warning
### Why are the changes needed?
if the log is too large, the Github action might truncate it. We need to reduce useless output.
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
```build/sbt "hive-thriftserver/testOnly *ThriftServerQueryTestSuite" -Phive-thriftserver``` locally
#### before
```java
[info] - count.sql (1 second, 537 milliseconds)
[info] - decimalArithmeticOperations.sql !!! IGNORED !!!
14:09:53.233 ERROR org.apache.thrift.server.TThreadPoolServer: Thrift error occurred during processing of message.
org.apache.thrift.transport.TTransportException
at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
at org.apache.thrift.transport.TSaslTransport.readLength(TSaslTransport.java:374)
at org.apache.thrift.transport.TSaslTransport.readFrame(TSaslTransport.java:451)
at org.apache.thrift.transport.TSaslTransport.read(TSaslTransport.java:433)
at org.apache.thrift.transport.TSaslServerTransport.read(TSaslServerTransport.java:43)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
at org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:425)
at org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:321)
at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:225)
at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:27)
at org.apache.hive.service.auth.TSetIpAddressProcessor.process(TSetIpAddressProcessor.java:53)
at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:310)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[info] - group-analytics.sql (4 seconds, 282 milliseconds)
[info] - csv-functions.sql (400 milliseconds)
14:09:24.234 ERROR org.apache.thrift.server.TThreadPoolServer: Thrift error occurred during processing of message.
org.apache.thrift.transport.TTransportException
at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
at org.apache.thrift.transport.TSaslTransport.readLength(TSaslTransport.java:374)
at org.apache.thrift.transport.TSaslTransport.readFrame(TSaslTransport.java:451)
at org.apache.thrift.transport.TSaslTransport.read(TSaslTransport.java:433)
at org.apache.thrift.transport.TSaslServerTransport.read(TSaslServerTransport.java:43)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
at org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:425)
at org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:321)
at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:225)
at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:27)
at org.apache.hive.service.auth.TSetIpAddressProcessor.process(TSetIpAddressProcessor.java:53)
at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:310)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[info] - datetime-formatting-invalid.sql (349 milliseconds)
14:09:26.544 ERROR org.apache.thrift.server.TThreadPoolServer: Thrift error occurred during processing of message.
org.apache.thrift.transport.TTransportException
at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
at org.apache.thrift.transport.TSaslTransport.readLength(TSaslTransport.java:374)
at org.apache.thrift.transport.TSaslTransport.readFrame(TSaslTransport.java:451)
at org.apache.thrift.transport.TSaslTransport.read(TSaslTransport.java:433)
at org.apache.thrift.transport.TSaslServerTransport.read(TSaslServerTransport.java:43)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
at org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:425)
at org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:321)
at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:225)
at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:27)
at org.apache.hive.service.auth.TSetIpAddressProcessor.process(TSetIpAddressProcessor.java:53)
at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:310)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[info] - except.sql (2 seconds, 309 milliseconds)
14:09:27.782 ERROR org.apache.thrift.server.TThreadPoolServer: Thrift error occurred during processing of message.
org.apache.thrift.transport.TTransportException
at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
at org.apache.thrift.transport.TSaslTransport.readLength(TSaslTransport.java:374)
at org.apache.thrift.transport.TSaslTransport.readFrame(TSaslTransport.java:451)
at org.apache.thrift.transport.TSaslTransport.read(TSaslTransport.java:433)
at org.apache.thrift.transport.TSaslServerTransport.read(TSaslServerTransport.java:43)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
at org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:425)
at org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:321)
at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:225)
at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:27)
at org.apache.hive.service.auth.TSetIpAddressProcessor.process(TSetIpAddressProcessor.java:53)
at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:310)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[info] - string-functions.sql (1 second, 237 milliseconds)
14:09:27.835 WARN org.apache.spark.sql.execution.datasources.DataSource: All paths were ignored:
14:09:29.266 ERROR org.apache.thrift.server.TThreadPoolServer: Thrift error occurred during processing of message.
org.apache.thrift.transport.TTransportException
at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
at org.apache.thrift.transport.TSaslTransport.readLength(TSaslTransport.java:374)
at org.apache.thrift.transport.TSaslTransport.readFrame(TSaslTransport.java:451)
at org.apache.thrift.transport.TSaslTransport.read(TSaslTransport.java:433)
at org.apache.thrift.transport.TSaslServerTransport.read(TSaslServerTransport.java:43)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
at org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:425)
at org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:321)
at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:225)
at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:27)
at org.apache.hive.service.auth.TSetIpAddressProcessor.process(TSetIpAddressProcessor.java:53)
at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:310)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```
#### after
```java
[info] - null-propagation.sql (181 milliseconds)
[info] - operators.sql (1 second, 772 milliseconds)
[info] - change-column.sql (241 milliseconds)
[info] - count.sql (1 second, 665 milliseconds)
[info] - decimalArithmeticOperations.sql !!! IGNORED !!!
[info] - group-analytics.sql (3 seconds, 926 milliseconds)
[info] - inline-table.sql (247 milliseconds)
[info] - comparator.sql (223 milliseconds)
[info] - show-tblproperties.sql (148 milliseconds)
[info] - timezone.sql (105 milliseconds)
[info] - parse-schema-string.sql (193 milliseconds)
```
Closes#31895 from yaooqinn/SPARK-34128-2.
Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
avoid unnecessary shuffle if possible
### Why are the changes needed?
avoid unnecessary shuffle.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
added testsuites and existing testsuites
Closes#31480 from zhengruifeng/repartitionAndSortWithinPartitions_opt_II.
Authored-by: Ruifeng Zheng <ruifengz@foxmail.com>
Signed-off-by: yi.wu <yi.wu@databricks.com>
### What changes were proposed in this pull request?
This PR is to fix the LIMIT code-gen bug in https://issues.apache.org/jira/browse/SPARK-34796, where the counter variable from `BaseLimitExec` is not initialized but used in code-gen. This is because the limit counter variable will be used in upstream operators (LIMIT's child plan, e.g. `ColumnarToRowExec` operator for early termination), but in the same stage, there can be some operators doing the shortcut and not calling `BaseLimitExec`'s `doConsume()`, e.g. [HashJoin.codegenInner](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala#L402). So if we have query that `LocalLimit - BroadcastHashJoin - FileScan` in the same stage, the whole stage code-gen compilation will be failed.
Here is an example:
```
test("failed limit query") {
withTable("left_table", "empty_right_table", "output_table") {
spark.range(5).toDF("k").write.saveAsTable("left_table")
spark.range(0).toDF("k").write.saveAsTable("empty_right_table")
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
spark.sql("CREATE TABLE output_table (k INT) USING parquet")
spark.sql(
s"""
|INSERT INTO TABLE output_table
|SELECT t1.k FROM left_table t1
|JOIN empty_right_table t2
|ON t1.k = t2.k
|LIMIT 3
|""".stripMargin)
}
}
}
```
Query plan:
```
Execute InsertIntoHadoopFsRelationCommand file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sql.SQLQuerySuite/output_table, false, Parquet, Map(path -> file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sql.SQLQuerySuite/output_table), Append, CatalogTable(
Database: default
Table: output_table
Created Time: Thu Mar 18 21:46:26 PDT 2021
Last Access: UNKNOWN
Created By: Spark 3.2.0-SNAPSHOT
Type: MANAGED
Provider: parquet
Location: file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sql.SQLQuerySuite/output_table
Schema: root
|-- k: integer (nullable = true)
), org.apache.spark.sql.execution.datasources.InMemoryFileIndexb25d08b, [k]
+- *(3) Project [ansi_cast(k#228L as int) AS k#231]
+- *(3) GlobalLimit 3
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#179]
+- *(2) LocalLimit 3
+- *(2) Project [k#228L]
+- *(2) BroadcastHashJoin [k#228L], [k#229L], Inner, BuildRight, false
:- *(2) Filter isnotnull(k#228L)
: +- *(2) ColumnarToRow
: +- FileScan parquet default.left_table[k#228L] Batched: true, DataFilters: [isnotnull(k#228L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sq..., PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:bigint>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#173]
+- *(1) Filter isnotnull(k#229L)
+- *(1) ColumnarToRow
+- FileScan parquet default.empty_right_table[k#229L] Batched: true, DataFilters: [isnotnull(k#229L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sq..., PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:bigint>
```
Codegen failure - https://gist.github.com/c21/ea760c75b546d903247582be656d9d66 .
The uninitialized variable `_limit_counter_1` from `LocalLimitExec` is referenced in `ColumnarToRowExec`, but `BroadcastHashJoinExec` does not call `LocalLimitExec.doConsume()` to initialize the counter variable.
The fix is to move the counter variable initialization to `doProduce()`, as in whole stage code-gen framework, `doProduce()` will definitely be called if upstream operators `doProduce()`/`doConsume()` is called.
Note: this only happens in AQE disabled case, because we have an AQE optimization rule [EliminateUnnecessaryJoin](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateUnnecessaryJoin.scala#L69) to change the whole query to an empty `LocalRelation` if inner join broadcast side is empty with AQE enabled.
### Why are the changes needed?
Fix query failure.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added unit test in `SQLQuerySuite.scala`.
Closes#31892 from c21/limit-fix.
Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
### What changes were proposed in this pull request?
This patch proposes to fix a bug related to `NestedColumnAliasing`. The root cause is `Window` doesn't override `producedAttributes` so `NestedColumnAliasing` rule wrongly prune attributes produced by `Window`.
The master and branch-3.1 both have this issue.
### Why are the changes needed?
It is needed to fix a bug of nested column pruning.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit test.
Closes#31897 from viirya/SPARK-34776.
Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
This PR aims to support remote driver/executor template files.
### Why are the changes needed?
Currently, `KubernetesUtils.loadPodFromTemplate` supports only local files.
With this PR, we can do the following.
```bash
bin/spark-submit \
...
-c spark.kubernetes.driver.podTemplateFile=s3a://dongjoon/driver.yml \
-c spark.kubernetes.executor.podTemplateFile=s3a://dongjoon/executor.yml \
...
```
### Does this PR introduce _any_ user-facing change?
Yes, this is an improvement.
### How was this patch tested?
Manual testing.
Closes#31877 from dongjoon-hyun/SPARK-34783-2.
Lead-authored-by: Dongjoon Hyun <dhyun@apple.com>
Co-authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
For all built-in datasources, prohibit saving of year-month and day-time intervals that were introduced by SPARK-27793. We plan to support saving of such types at the milestone 2, see SPARK-27790.
### Why are the changes needed?
To improve user experience with Spark SQL, and print nicer error message. Current error message might confuse users:
```
scala> Seq(java.time.Period.ofMonths(1)).toDF.write.mode("overwrite").json("/Users/maximgekk/tmp/123")
21/03/18 22:44:35 ERROR FileFormatWriter: Aborting job 8de402d7-ab69-4dc0-aa8e-14ef06bd2d6b.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1) (192.168.1.66 executor driver): org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:418)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:298)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:211)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1437)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Failed to convert value 1 (class of class java.lang.Integer}) with the type of YearMonthIntervalType to JSON.
at scala.sys.package$.error(package.scala:30)
at org.apache.spark.sql.catalyst.json.JacksonGenerator.$anonfun$makeWriter$23(JacksonGenerator.scala:179)
at org.apache.spark.sql.catalyst.json.JacksonGenerator.$anonfun$makeWriter$23$adapted(JacksonGenerator.scala:176)
```
### Does this PR introduce _any_ user-facing change?
Yes. After the changes, the example above:
```
scala> Seq(java.time.Period.ofMonths(1)).toDF.write.mode("overwrite").json("/Users/maximgekk/tmp/123")
org.apache.spark.sql.AnalysisException: Cannot save interval data type into external storage.
```
### How was this patch tested?
1. Checked nested intervals:
```
scala> spark.range(1).selectExpr("""struct(timestamp'2021-01-02 00:01:02' - timestamp'2021-01-01 00:00:00')""").write.mode("overwrite").parquet("/Users/maximgekk/tmp/123")
org.apache.spark.sql.AnalysisException: Cannot save interval data type into external storage.
scala> Seq(Seq(java.time.Period.ofMonths(1))).toDF.write.mode("overwrite").json("/Users/maximgekk/tmp/123")
org.apache.spark.sql.AnalysisException: Cannot save interval data type into external storage.
```
2. By running existing test suites:
```
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *DataSourceV2DataFrameSuite"
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *DataSourceV2SQLSuite"
```
Closes#31884 from MaxGekk/ban-save-intervals.
Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
### What changes were proposed in this pull request?
join condition 'a.attr == 'c.attr check the reference of these 2 objects which will always returns false. we need to use === instead
### Why are the changes needed?
Although this join condition always false doesn't break the test but it is not what we expected. We should fix it to avoid future confusing
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT
Closes#31890 from opensky142857/SPARK-34798.
Authored-by: Hongyi Zhang <hongyzhang@ebay.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
### What changes were proposed in this pull request?
This is a follow-up of https://github.com/apache/spark/pull/31671https://github.com/apache/spark/pull/31671 has an unexpected behavior change that it uses a different Hadoop conf (`sparkContext.hadoopConfiguration`) to instantiate `FileSystem`, which is used to qualify the warehouse path. Before https://github.com/apache/spark/pull/31671 , the Hadoop conf to instantiate `FileSystem` is `session.sessionState.newHadoopConf()`.
More specifically, `session.sessionState.newHadoopConf()` has more conf entries:
1. it includes configs from `SharedState.initialConfigs`
2. in includes configs from `sparkContext.conf`
This PR updates `SharedState` to use the final Hadoop conf to instantiate `FileSystem`.
### Why are the changes needed?
fix behavior change
### Does this PR introduce _any_ user-facing change?
yes, the behavior will be the same before https://github.com/apache/spark/pull/31671
### How was this patch tested?
manually check the log of `FileSystem` and verify the passed in configs.
Closes#31868 from cloud-fan/followup.
Lead-authored-by: Wenchen Fan <wenchen@databricks.com>
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>