Commit graph

29712 commits

Author SHA1 Message Date
hezuojiao 39542bb81f [SPARK-34790][CORE] Disable fetching shuffle blocks in batch when io encryption is enabled
### What changes were proposed in this pull request?

This patch proposes to disable fetching shuffle blocks in batch when io encryption is enabled. Adaptive Query Execution fetch contiguous shuffle blocks for the same map task in batch to reduce IO and improve performance. However, we found that batch fetching is incompatible with io encryption.

### Why are the changes needed?
Before this patch, we set `spark.io.encryption.enabled` to true, then run some queries which coalesced partitions by AEQ, may got following error message:
```14:05:52.638 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 1.0 in stage 2.0 (TID 3) (11.240.37.88 executor driver): FetchFailed(BlockManagerId(driver, 11.240.37.88, 63574, None), shuffleId=0, mapIndex=0, mapId=0, reduceId=2, message=
org.apache.spark.shuffle.FetchFailedException: Stream is corrupted
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:772)
	at org.apache.spark.storage.BufferReleasingInputStream.read(ShuffleBlockFetcherIterator.scala:845)
	at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
	at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
	at java.io.DataInputStream.readInt(DataInputStream.java:387)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.readSize(UnsafeRowSerializer.scala:113)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:129)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:110)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:494)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
	at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
	at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1437)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Stream is corrupted
	at net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:200)
	at net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:226)
	at net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:157)
	at org.apache.spark.storage.BufferReleasingInputStream.read(ShuffleBlockFetcherIterator.scala:841)
	... 25 more

)
```

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

New tests.

Closes #31898 from hezuojiao/fetch_shuffle_in_batch.

Authored-by: hezuojiao <hezuojiao@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-03-22 13:06:12 -07:00
Yikun Jiang 31da90762e [SPARK-34820][K8S][R] add apt-update before gnupg install
### What changes were proposed in this pull request?
We added the gnupg installation in https://github.com/apache/spark/pull/30130 , we should do apt update before gnupg isntallation, otherwise we will get a fetch error when package is updated.

See more in:
[1] http://apache-spark-developers-list.1001551.n3.nabble.com/K8s-Integration-test-is-unable-to-run-because-of-the-unavailable-libs-td30986.html

### Why are the changes needed?
add a apt-update cmd before gnupg installation to avoid invaild package cache list.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
K8s Integration test passed

Closes #31923 from Yikun/SPARK-34820.

Authored-by: Yikun Jiang <yikunkero@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-03-22 10:13:31 -07:00
PengLei 85581f6dac [SPARK-33925][CORE][FOLLOW-UP] Remove the unused variables 'secMgr'
### What changes were proposed in this pull request?
Remove the unused variable 'secMgr' in SparkSubmit.scala and DriverWrapper.scala
In jira https://issues.apache.org/jira/browse/SPARK-33925, The last usage of SecurityManager in Utils.fetchFile was removed. We don't need the variable anymore

### Why are the changes needed?
For better readablity of codes

### Does this PR introduce _any_ user-facing change?
No,dev-only

### How was this patch tested?
Manually complied. Github Actions and Jenkins build should test it out as well.

Closes #31928 from Peng-Lei/rm_secMgr.

Authored-by: PengLei <18066542445@189.cn>
Signed-off-by: Sean Owen <srowen@gmail.com>
2021-03-22 12:02:25 -05:00
tanel.kiis@gmail.com 51cf0cadea [SPARK-34812][SQL] RowNumberLike and RankLike should not be nullable
### What changes were proposed in this pull request?

Marked `RowNumberLike` and `RankLike` as not-nullable.

### Why are the changes needed?

`RowNumberLike` and `RankLike` SQL expressions never return null value. Marking them as non-nullable can have some performance benefits, because some optimizer rules apply only to non-nullable expressions

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Did not find any existing tests on the nullability of aggregate functions.
Plan stability suite partially covers this.

Closes #31924 from tanelk/SPARK-34812_nullability.

Authored-by: tanel.kiis@gmail.com <tanel.kiis@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-03-22 14:55:43 +00:00
John Ayad ddfc75ec64 [SPARK-34803][PYSPARK] Pass the raised ImportError if pandas or pyarrow fail to import
### What changes were proposed in this pull request?

Pass the raised `ImportError` on failing to import pandas/pyarrow. This will help the user identify whether pandas/pyarrow are indeed not in the environment or if they threw a different `ImportError`.

### Why are the changes needed?

This can already happen in Pandas for example where it could throw an `ImportError` on its initialisation path if `dateutil` doesn't satisfy a certain version requirement https://github.com/pandas-dev/pandas/blob/0.24.x/pandas/compat/__init__.py#L438

### Does this PR introduce _any_ user-facing change?

Yes, it will now show the root cause of the exception when pandas or arrow is missing during import.

### How was this patch tested?

Manually tested.

```python
from pyspark.sql.functions import pandas_udf
spark.range(1).select(pandas_udf(lambda x: x))
```

Before:

```
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/...//spark/python/pyspark/sql/pandas/functions.py", line 332, in pandas_udf
    require_minimum_pyarrow_version()
  File "/.../spark/python/pyspark/sql/pandas/utils.py", line 53, in require_minimum_pyarrow_version
    raise ImportError("PyArrow >= %s must be installed; however, "
ImportError: PyArrow >= 1.0.0 must be installed; however, it was not found.
```

After:

```
Traceback (most recent call last):
  File "/.../spark/python/pyspark/sql/pandas/utils.py", line 49, in require_minimum_pyarrow_version
    import pyarrow
ModuleNotFoundError: No module named 'pyarrow'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/.../spark/python/pyspark/sql/pandas/functions.py", line 332, in pandas_udf
    require_minimum_pyarrow_version()
  File "/.../spark/python/pyspark/sql/pandas/utils.py", line 55, in require_minimum_pyarrow_version
    raise ImportError("PyArrow >= %s must be installed; however, "
ImportError: PyArrow >= 1.0.0 must be installed; however, it was not found.
```

Closes #31902 from johnhany97/jayad/spark-34803.

Lead-authored-by: John Ayad <johnhany97@gmail.com>
Co-authored-by: John H. Ayad <johnhany97@gmail.com>
Co-authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2021-03-22 23:29:28 +09:00
Ismaël Mejía 8a552bfc76 [SPARK-34778][BUILD] Upgrade to Avro 1.10.2
### What changes were proposed in this pull request?
Update the  Avro version to 1.10.2

### Why are the changes needed?
To stay up to date with upstream and catch compatibility issues with zstd

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Unit tests

Closes #31866 from iemejia/SPARK-27733-upgrade-avro-1.10.2.

Authored-by: Ismaël Mejía <iemejia@gmail.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
2021-03-22 19:30:14 +08:00
woyumen4597 f44608a8c0 [SPARK-34800][SQL] Use fine-grained lock in SessionCatalog.tableExists
### What changes were proposed in this pull request?
Use fine-grained lock in SessionCatalog.tableExists, in order to lock currentDB variable rather than lock `tableExists` method which will block inner external catalog's behaviour.

### Why are the changes needed?
We have modified the underlying hive meta store which a different hive  database is placed in its own shard for performance. However, we found that the synchronized lock  limits the concurrency.

### How was this patch tested?
Existing tests.

Closes #31891 from woyumen4597/SPARK-34800.

Authored-by: woyumen4597 <woyumen4597@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-03-22 09:03:46 +00:00
Terry Kim 7953fcdb56 [SPARK-34700][SQL] SessionCatalog's temporary view related APIs should take/return more concrete types
### What changes were proposed in this pull request?

Now that all the temporary views are wrapped with `TemporaryViewRelation`(#31273, #31652, and #31825), this PR proposes to update `SessionCatalog`'s APIs for temporary views to take or return more concrete types.

APIs that will take `TemporaryViewRelation` instead of `LogicalPlan`:
```
createTempView, createGlobalTempView, alterTempViewDefinition
```

APIs that will return `TemporaryViewRelation` instead of `LogicalPlan`:
```
getRawTempView, getRawGlobalTempView
```

APIs that will return `View` instead of `LogicalPlan`:
```
getTempView, getGlobalTempView, lookupTempView
```

### Why are the changes needed?

Internal refactoring to work with more concrete types.

### Does this PR introduce _any_ user-facing change?

No, this is internal refactoring.

### How was this patch tested?

Updated existing tests affected by the refactoring.

Closes #31906 from imback82/use_temporary_view_relation.

Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-03-22 08:17:54 +00:00
yi.wu e4bb97526c [SPARK-34089][CORE] HybridRowQueue should respect the configured memory mode
### What changes were proposed in this pull request?

This PR fixes the `HybridRowQueue ` to respect the configured memory mode.

Besides, this PR also refactored the constructor of `MemoryConsumer` to accept the memory mode explicitly.

### Why are the changes needed?

`HybridRowQueue` supports both onHeap and offHeap manipulation. But it inherited the wrong `MemoryConsumer` constructor, which hard-coded the memory mode to `onHeap`.

### Does this PR introduce _any_ user-facing change?

No. (Maybe yes in some cases where users can't complete the job before could complete successfully after the fix because of `HybridRowQueue` is able to spill under offHeap mode now. )

### How was this patch tested?

Updated the existing test to make it test both offHeap and onHeap modes.

Closes #31152 from Ngone51/fix-MemoryConsumer-memorymode.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-03-22 08:12:08 +00:00
HyukjinKwon ec70467d4d [SPARK-34815][SQL] Update CSVBenchmark
### What changes were proposed in this pull request?

This PR updates CSVBenchmark especially we have a fix like https://github.com/apache/spark/pull/31858 that could potentially improve the performance.

### Why are the changes needed?

To have the updated benchmark results.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manually ran the benchmark

Closes #31917 from HyukjinKwon/SPARK-34815.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-03-22 10:49:53 +03:00
Jungtaek Lim (HeartSaVioR) 121883b1a5 [SPARK-34383][SS] Optimize WAL commit phase via reducing cost of filesystem operations
### What changes were proposed in this pull request?

This PR proposes to optimize WAL commit phase via following changes:

* cache offset log to avoid FS get operation per batch
* just directly delete instead of employing FS list operation on purge

### Why are the changes needed?

There're inefficiency on WAL commit phase which can be easily optimized via using a small driver memory.

1. To provide the offset metadata to source side (via `source.commit()`), we read offset metadata for previous batch from file system, which is probably written by this driver in previous batches. Caching it into driver memory would reduce the get operation.
2. Spark calls purge against offset log & commit log per batch, which calls list operation. If the previous batch succeeded to purge, the current batch just needs to check one batch which can be simply done via direct delete operation, instead of calling list operation.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manually tested with additional debug log. (Verified that cache is used, cache keeps the size as 2, only one delete call is used instead of list call)

Did some experiment with simple rate to console query. (NOTE: wasn't done with master branch - tested against Spark 2.4.x, but WAL commit phase hasn't been changed AFAIK during these versions)

AWS S3 + S3 guard:

> before the patch

<img width="1075" alt="aws-before" src="https://user-images.githubusercontent.com/1317309/107108721-6cc54380-687d-11eb-8f10-b906b9d58397.png">

> after the patch

<img width="1071" alt="aws-after" src="https://user-images.githubusercontent.com/1317309/107108724-7189f780-687d-11eb-88da-26912ac15c85.png">

Azure:

> before the patch

<img width="1074" alt="azure-before" src="https://user-images.githubusercontent.com/1317309/107108726-75b61500-687d-11eb-8c06-9048fa10ff9a.png">

> after the patch

<img width="1069" alt="azure-after" src="https://user-images.githubusercontent.com/1317309/107108729-79e23280-687d-11eb-8d97-e7f3aeec51be.png">

Closes #31495 from HeartSaVioR/SPARK-34383.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
2021-03-22 08:47:07 +01:00
Cheng Su f8838fe82b [SPARK-34708][SQL] Code-gen for left semi/anti broadcast nested loop join (build right side)
### What changes were proposed in this pull request?

This PR is to add code-gen support for left semi / left anti BroadcastNestedLoopJoin (build side is right side). The execution code path for build left side cannot fit into whole stage code-gen framework, so only add the code-gen for build right side here.

Reference: the iterator (non-code-gen) code path is `BroadcastNestedLoopJoinExec.leftExistenceJoin()` with `BuildRight`.

### Why are the changes needed?

Improve query CPU performance.
Tested with a simple query:

```
val N = 20 << 20
val M = 1 << 4

val dim = broadcast(spark.range(M).selectExpr("id as k2"))
codegenBenchmark("left semi broadcast nested loop join", N) {
  park.range(N).selectExpr(s"id as k1").join(
    dim, col("k1") + 1 <= col("k2"), "left_semi")
}
```

Seeing 5x run time improvement:

```
Running benchmark: left semi broadcast nested loop join
  Running case: left semi broadcast nested loop join codegen off
  Stopped after 2 iterations, 6958 ms
  Running case: left semi broadcast nested loop join codegen on
  Stopped after 5 iterations, 3383 ms

Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.15.7
Intel(R) Core(TM) i9-9980HK CPU  2.40GHz
left semi broadcast nested loop join:             Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
--------------------------------------------------------------------------------------------------------------------------------
left semi broadcast nested loop join codegen off           3434           3479          65          6.1         163.7       1.0X
left semi broadcast nested loop join codegen on             672            677           5         31.2          32.1       5.1X
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Changed existing unit test in `ExistenceJoinSuite.scala` to cover all code paths:
* left semi/anti + empty right side + empty condition
* left semi/anti + non-empty right side + empty condition
* left semi/anti + right side + non-empty condition

Added unit test in `WholeStageCodegenSuite.scala` to make sure code-gen for broadcast nested loop join is taking effect, and test for multiple join case as well.

Example query:

```
val df1 = spark.range(4).select($"id".as("k1"))
val df2 = spark.range(3).select($"id".as("k2"))
df1.join(df2, $"k1" + 1 <= $"k2", "left_semi").explain("codegen")
```

Example generated code (`bnlj_doConsume_0` method):
This is for left semi join. The generated code for left anti join is mostly to be same as here, except L55 to be `if (bnlj_findMatchedRow_0 == false) {`.
```
== Subtree 2 / 2 (maxMethodCodeSize:282; maxConstantPoolSize:203(0.31% used); numInnerClasses:0) ==
*(2) Project [id#0L AS k1#2L]
+- *(2) BroadcastNestedLoopJoin BuildRight, LeftSemi, ((id#0L + 1) <= k2#6L)
   :- *(2) Range (0, 4, step=1, splits=2)
   +- BroadcastExchange IdentityBroadcastMode, [id=#23]
      +- *(1) Project [id#4L AS k2#6L]
         +- *(1) Range (0, 3, step=1, splits=2)

Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIteratorForCodegenStage2(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=2
/* 006 */ final class GeneratedIteratorForCodegenStage2 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */   private Object[] references;
/* 008 */   private scala.collection.Iterator[] inputs;
/* 009 */   private boolean range_initRange_0;
/* 010 */   private long range_nextIndex_0;
/* 011 */   private TaskContext range_taskContext_0;
/* 012 */   private InputMetrics range_inputMetrics_0;
/* 013 */   private long range_batchEnd_0;
/* 014 */   private long range_numElementsTodo_0;
/* 015 */   private InternalRow[] bnlj_buildRowArray_0;
/* 016 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] range_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[4];
/* 017 */
/* 018 */   public GeneratedIteratorForCodegenStage2(Object[] references) {
/* 019 */     this.references = references;
/* 020 */   }
/* 021 */
/* 022 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 023 */     partitionIndex = index;
/* 024 */     this.inputs = inputs;
/* 025 */
/* 026 */     range_taskContext_0 = TaskContext.get();
/* 027 */     range_inputMetrics_0 = range_taskContext_0.taskMetrics().inputMetrics();
/* 028 */     range_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 029 */     range_mutableStateArray_0[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 030 */     bnlj_buildRowArray_0 = (InternalRow[]) ((org.apache.spark.broadcast.TorrentBroadcast) references[1] /* broadcastTerm */).value();
/* 031 */     range_mutableStateArray_0[2] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 032 */     range_mutableStateArray_0[3] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 033 */
/* 034 */   }
/* 035 */
/* 036 */   private void bnlj_doConsume_0(long bnlj_expr_0_0) throws java.io.IOException {
/* 037 */     boolean bnlj_findMatchedRow_0 = false;
/* 038 */     for (int bnlj_arrayIndex_0 = 0; bnlj_arrayIndex_0 < bnlj_buildRowArray_0.length; bnlj_arrayIndex_0++) {
/* 039 */       UnsafeRow bnlj_buildRow_0 = (UnsafeRow) bnlj_buildRowArray_0[bnlj_arrayIndex_0];
/* 040 */
/* 041 */       long bnlj_value_1 = bnlj_buildRow_0.getLong(0);
/* 042 */
/* 043 */       long bnlj_value_3 = -1L;
/* 044 */
/* 045 */       bnlj_value_3 = bnlj_expr_0_0 + 1L;
/* 046 */
/* 047 */       boolean bnlj_value_2 = false;
/* 048 */       bnlj_value_2 = bnlj_value_3 <= bnlj_value_1;
/* 049 */       if (!(false || !bnlj_value_2))
/* 050 */       {
/* 051 */         bnlj_findMatchedRow_0 = true;
/* 052 */         break;
/* 053 */       }
/* 054 */     }
/* 055 */     if (bnlj_findMatchedRow_0 == true) {
/* 056 */       ((org.apache.spark.sql.execution.metric.SQLMetric) references[2] /* numOutputRows */).add(1);
/* 057 */
/* 058 */       // common sub-expressions
/* 059 */
/* 060 */       range_mutableStateArray_0[3].reset();
/* 061 */
/* 062 */       range_mutableStateArray_0[3].write(0, bnlj_expr_0_0);
/* 063 */       append((range_mutableStateArray_0[3].getRow()).copy());
/* 064 */
/* 065 */     }
/* 066 */
/* 067 */   }
/* 068 */
/* 069 */   private void initRange(int idx) {
/* 070 */     java.math.BigInteger index = java.math.BigInteger.valueOf(idx);
/* 071 */     java.math.BigInteger numSlice = java.math.BigInteger.valueOf(2L);
/* 072 */     java.math.BigInteger numElement = java.math.BigInteger.valueOf(4L);
/* 073 */     java.math.BigInteger step = java.math.BigInteger.valueOf(1L);
/* 074 */     java.math.BigInteger start = java.math.BigInteger.valueOf(0L);
/* 075 */     long partitionEnd;
/* 076 */
/* 077 */     java.math.BigInteger st = index.multiply(numElement).divide(numSlice).multiply(step).add(start);
/* 078 */     if (st.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
/* 079 */       range_nextIndex_0 = Long.MAX_VALUE;
/* 080 */     } else if (st.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
/* 081 */       range_nextIndex_0 = Long.MIN_VALUE;
/* 082 */     } else {
/* 083 */       range_nextIndex_0 = st.longValue();
/* 084 */     }
/* 085 */     range_batchEnd_0 = range_nextIndex_0;
/* 086 */
/* 087 */     java.math.BigInteger end = index.add(java.math.BigInteger.ONE).multiply(numElement).divide(numSlice)
/* 088 */     .multiply(step).add(start);
/* 089 */     if (end.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
/* 090 */       partitionEnd = Long.MAX_VALUE;
/* 091 */     } else if (end.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
/* 092 */       partitionEnd = Long.MIN_VALUE;
/* 093 */     } else {
/* 094 */       partitionEnd = end.longValue();
/* 095 */     }
/* 096 */
/* 097 */     java.math.BigInteger startToEnd = java.math.BigInteger.valueOf(partitionEnd).subtract(
/* 098 */       java.math.BigInteger.valueOf(range_nextIndex_0));
/* 099 */     range_numElementsTodo_0  = startToEnd.divide(step).longValue();
/* 100 */     if (range_numElementsTodo_0 < 0) {
/* 101 */       range_numElementsTodo_0 = 0;
/* 102 */     } else if (startToEnd.remainder(step).compareTo(java.math.BigInteger.valueOf(0L)) != 0) {
/* 103 */       range_numElementsTodo_0++;
/* 104 */     }
/* 105 */   }
/* 106 */
/* 107 */   protected void processNext() throws java.io.IOException {
/* 108 */     // initialize Range
/* 109 */     if (!range_initRange_0) {
/* 110 */       range_initRange_0 = true;
/* 111 */       initRange(partitionIndex);
/* 112 */     }
/* 113 */
/* 114 */     while (true) {
/* 115 */       if (range_nextIndex_0 == range_batchEnd_0) {
/* 116 */         long range_nextBatchTodo_0;
/* 117 */         if (range_numElementsTodo_0 > 1000L) {
/* 118 */           range_nextBatchTodo_0 = 1000L;
/* 119 */           range_numElementsTodo_0 -= 1000L;
/* 120 */         } else {
/* 121 */           range_nextBatchTodo_0 = range_numElementsTodo_0;
/* 122 */           range_numElementsTodo_0 = 0;
/* 123 */           if (range_nextBatchTodo_0 == 0) break;
/* 124 */         }
/* 125 */         range_batchEnd_0 += range_nextBatchTodo_0 * 1L;
/* 126 */       }
/* 127 */
/* 128 */       int range_localEnd_0 = (int)((range_batchEnd_0 - range_nextIndex_0) / 1L);
/* 129 */       for (int range_localIdx_0 = 0; range_localIdx_0 < range_localEnd_0; range_localIdx_0++) {
/* 130 */         long range_value_0 = ((long)range_localIdx_0 * 1L) + range_nextIndex_0;
/* 131 */
/* 132 */         bnlj_doConsume_0(range_value_0);
/* 133 */
/* 134 */         if (shouldStop()) {
/* 135 */           range_nextIndex_0 = range_value_0 + 1L;
/* 136 */           ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(range_localIdx_0 + 1);
/* 137 */           range_inputMetrics_0.incRecordsRead(range_localIdx_0 + 1);
/* 138 */           return;
/* 139 */         }
/* 140 */
/* 141 */       }
/* 142 */       range_nextIndex_0 = range_batchEnd_0;
/* 143 */       ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(range_localEnd_0);
/* 144 */       range_inputMetrics_0.incRecordsRead(range_localEnd_0);
/* 145 */       range_taskContext_0.killTaskIfInterrupted();
/* 146 */     }
/* 147 */   }
/* 148 */
/* 149 */ }
```

Closes #31874 from c21/code-semi-anti.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-03-22 07:31:16 +00:00
HyukjinKwon c7bf8adc38 [SPARK-34818][PYTHON][DOCS] Reorder the items in User Guide at PySpark documentation
### What changes were proposed in this pull request?

This PR proposes to reorder the items in User Guide in PySpark documentation in order to place general guides first and advance ones later.

### Why are the changes needed?

For users to more easily follow.

### Does this PR introduce _any_ user-facing change?

Yes, it changes the order in the items in documentation .

### How was this patch tested?

Manually verified the documentation after building:

<img width="768" alt="Screen Shot 2021-03-22 at 2 38 41 PM" src="https://user-images.githubusercontent.com/6477701/111945072-5537d680-8b1c-11eb-9f43-02f3ad63a509.png">

FWIW, the current page: https://spark.apache.org/docs/latest/api/python/user_guide/index.html

Closes #31922 from HyukjinKwon/SPARK-34818.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2021-03-22 15:53:39 +09:00
Yuanjian Li 45235ac4bc [SPARK-34748][SS] Create a rule of the analysis logic for streaming write
### What changes were proposed in this pull request?
- Create a new rule `ResolveStreamWrite` for all analysis logic for streaming write.
- Add corresponding logical plans `WriteToStreamStatement` and `WriteToStream`.

### Why are the changes needed?
Currently, the analysis logic for streaming write is mixed in StreamingQueryManager. If we create a specific analyzer rule and separated logical plans, it should be helpful for further extension.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing tests.

Closes #31842 from xuanyuanking/SPARK-34748.

Authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-03-22 06:39:39 +00:00
Bo Zhang 3bef2dc01a Revert "[SPARK-34757][CORE][DEPLOY] Ignore cache for SNAPSHOT dependencies in spark-submit"
### What changes were proposed in this pull request?

This reverts commit 86ea520320.

### Why are the changes needed?

The test added in the change was flaky.

Closes #31918 from bozhang2820/revert-spark-34757.

Authored-by: Bo Zhang <bo.zhang@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2021-03-22 15:07:15 +09:00
Kousuke Saruta 0734101bb7 [SPARK-34225][CORE] Don't encode further when a URI form string is passed to addFile or addJar
### What changes were proposed in this pull request?

This PR fixes an issue that `addFile` and `addJar` further encode even though a URI form string is passed.
For example, the following operation will throw exception even though the file exists.
```
sc.addFile("file:/foo/test%20file.txt")
```

Another case is `--files` and `--jars` option when we submit an application.
```
bin/spark-shell --files "/foo/test file.txt"
```
The path above is transformed to URI form [here](ecf4811764/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala (L400)) and passed to `addFile` so the same issue happens.

### Why are the changes needed?

This is a bug.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New test.

Closes #31718 from sarutak/fix-uri-encode-double.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Kousuke Saruta <sarutak@oss.nttdata.com>
2021-03-22 14:06:41 +09:00
Josh Soref f4de93efb0 [MINOR][SQL] Spelling: filters - PushedFilers
### What changes were proposed in this pull request?
Consistently correct the spelling of `PushedFilters`

### Why are the changes needed?
bersprockets noted that it's wrong

### Does this PR introduce _any_ user-facing change?

Technically, I think it does. Practically, neither Google nor GitHub show anyone using `pushedFilers` outside of forks (or the discussion about fixing it started at https://github.com/apache/spark/pull/30323#issuecomment-725568719)

### How was this patch tested?
None beyond CI in the previous PR

Closes #30678 from jsoref/spelling-filters.

Authored-by: Josh Soref <jsoref@users.noreply.github.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-03-22 08:00:12 +03:00
Ruifeng Zheng 47da944f59 [SPARK-34470][ML] VectorSlicer utilize ordering if possible
### What changes were proposed in this pull request?
1, add a new param `sorted` in `slice`;
2, in `VectorSlicer`, set `sorted = true` if input indices are ordered.

### Why are the changes needed?
The input indices of VectorSlicer are probably ordered.
VectorSlicer should use this attribute if possible.

I did a simple test and `sorted = true` maybe about 70% faster than existing `slice`

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
added testsuite

Closes #31588 from zhengruifeng/vector_slice_for_sorted_indices.

Authored-by: Ruifeng Zheng <ruifengz@foxmail.com>
Signed-off-by: Ruifeng Zheng <ruifengz@foxmail.com>
2021-03-22 09:46:53 +08:00
Dongjoon Hyun c5fd94f119 [SPARK-34772][TESTS][FOLLOWUP] Disable a test case using Hive 1.2.1 in Java9+ environment
### What changes were proposed in this pull request?

This PR aims to disable a new test case using Hive 1.2.1 from Java9+ test environment.

### Why are the changes needed?

[HIVE-6113](https://issues.apache.org/jira/browse/HIVE-6113) upgraded Datanucleus to 4.x at Hive 2.0. Datanucleus 3.x doesn't support Java9+.

**Java 9+ Environment**
```
$ build/sbt "hive/testOnly *.HiveSparkSubmitSuite -- -z SPARK-34772" -Phive
...
[info] *** 1 TEST FAILED ***
[error] Failed: Total 1, Failed 1, Errors 0, Passed 0
[error] Failed tests:
[error] 	org.apache.spark.sql.hive.HiveSparkSubmitSuite
[error] (hive / Test / testOnly) sbt.TestsFailedException: Tests unsuccessful
[error] Total time: 328 s (05:28), completed Mar 21, 2021, 5:32:39 PM
```

### Does this PR introduce _any_ user-facing change?

Fix the UT in Java9+ environment.

### How was this patch tested?

Manually.

```
$ build/sbt "hive/testOnly *.HiveSparkSubmitSuite -- -z SPARK-34772" -Phive
...
[info] HiveSparkSubmitSuite:
[info] - SPARK-34772: RebaseDateTime loadRebaseRecords should use Spark classloader instead of context !!! CANCELED !!! (26 milliseconds)
[info]   org.apache.commons.lang3.SystemUtils.isJavaVersionAtLeast(JAVA_9) was true (HiveSparkSubmitSuite.scala:344)
```

Closes #31916 from dongjoon-hyun/SPARK-HiveSparkSubmitSuite.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-03-21 17:59:55 -07:00
Dongjoon Hyun 3bc6fe4e77 [SPARK-34809][CORE] Enable spark.hadoopRDD.ignoreEmptySplits by default
### What changes were proposed in this pull request?

This PR aims to enable `spark.hadoopRDD.ignoreEmptySplits` by default for Apache Spark 3.2.0.

### Why are the changes needed?

Although this is a safe improvement, this hasn't been enabled by default to avoid the explicit behavior change. This PR aims to switch the default explicitly in Apache Spark 3.2.0.

### Does this PR introduce _any_ user-facing change?

Yes, the behavior change is documented.

### How was this patch tested?

Pass the existing CIs.

Closes #31909 from dongjoon-hyun/SPARK-34809.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-03-21 14:34:02 -07:00
Dongjoon Hyun 3c32b54a0f [SPARK-34811][CORE] Redact fs.s3a.access.key like secret and token
### What changes were proposed in this pull request?

Like we redact secrets and tokens, this PR aims to redact access key.

### Why are the changes needed?

Access key is also worth to hide.

### Does this PR introduce _any_ user-facing change?

This will hide this information from SparkUI (`Spark Properties` and `Hadoop Properties` and logs).

### How was this patch tested?

Pass the newly updated UT.

Closes #31912 from dongjoon-hyun/SPARK-34811.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-03-21 14:08:34 -07:00
Zhang, Xingchao 2888d1883e [SPARK-34784][BUILD] Upgrade Jackson to 2.12.2
### What changes were proposed in this pull request?

This pr upgrade Jackson to 2.12.2.
Jackson Release 2.12: https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.12

### Why are the changes needed?

Make it easy to upgrade Avro 1.10.2.

```
[error] Caused by: sbt.ForkMain$ForkError: com.fasterxml.jackson.databind.JsonMappingException: Scala module 2.11.4 requires Jackson Databind version >= 2.11.0 and < 2.12.0
[error] 	at com.fasterxml.jackson.module.scala.JacksonModule.setupModule(JacksonModule.scala:61)
[error] 	at com.fasterxml.jackson.module.scala.JacksonModule.setupModule$(JacksonModule.scala:46)
[error] 	at com.fasterxml.jackson.module.scala.DefaultScalaModule.setupModule(DefaultScalaModule.scala:17)
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Tested with Avro 1.10.2 and Parquet 1.12.0: https://github.com/apache/spark/runs/2157735537

Closes #31878 from xclyfe/SPARK-34784.

Authored-by: Zhang, Xingchao <xingczhang@ebay.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
2021-03-21 15:36:38 +08:00
William Hyun c799d049fc [SPARK-34810][TEST] Update PostgreSQL test with the latest results
### What changes were proposed in this pull request?

This PR aims to update `PostgresIntegrationSuite` with the latest results.

### Why are the changes needed?

The latest PostgreSQL jar version is 42.2.19. Since 42.2.9, the test is broken because it returns `0.0` instead of `0.00`.
- https://jdbc.postgresql.org/documentation/changelog.html#version_42.2.19

42.2.9 (2019-12-06)
42.2.10 (2020-01-30)
42.2.11 (2020-03-09)
42.2.12 (2020-03-31)
42.2.13 (2020-06-04)
42.2.14 (2020-06-10)
42.2.15 (2020-08-14)
42.2.16 (2020-08-20)
42.2.17 (2020-10-09)
42.2.18 (2020-10-15)
42.2.19 (2021-02-18)

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CI with the updated test cases.

```
build/sbt -Pdocker-integration-tests 'docker-integration-tests/testOnly org.apache.spark.sql.jdbc.PostgresIntegrationSuite'
```

Closes #31910 from williamhyun/pg.

Authored-by: William Hyun <williamhyun3@gmail.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
2021-03-21 13:36:45 +08:00
Kousuke Saruta 94fd6cb0ce [SPARK-34636][FOLLOWUP][SQL] Fix an incompatible behavior of UnresolvedAttribute.sql
### What changes were proposed in this pull request?

This PR fixes an incompatible behavior introduced by #31754.
The problem is that quoted name parts represented as a string are given to the constructor of `UnresolvedAttribute` which takes single string parameter, `sql` method invocation against the `UnresolvedAttrribute` returns different result than before.

One example is ``` UnresolvedAttribute("`a.b`").sql ```. This  returned `a.b` before but it doesn't now.

See [this duscussion](https://github.com/apache/spark/pull/31754/files#r597181927) for more details.

### Why are the changes needed?

For compatibility.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New assertion.

Closes #31885 from sarutak/followup-SPARK-34636.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-03-20 14:44:36 -07:00
Yuming Wang 908318f30d [SPARK-28220][SQL] Improve PropagateEmptyRelation to support join with false condition
### What changes were proposed in this pull request?

Improve `PropagateEmptyRelation` to support join with false condition. For example:
```sql
SELECT * FROM t1 LEFT JOIN t2 ON false
```

Before this pr:
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastNestedLoopJoin BuildRight, LeftOuter, false
   :- FileScan parquet default.t1[a#4L]
   +- BroadcastExchange IdentityBroadcastMode, [id=#40]
      +- FileScan parquet default.t2[b#5L]
```

After this pr:
```
== Physical Plan ==
*(1) Project [a#4L, null AS b#5L]
+- *(1) ColumnarToRow
   +- FileScan parquet default.t1[a#4L]
```

### Why are the changes needed?

Avoid `BroadcastNestedLoopJoin` to improve query performance.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

Closes #31857 from wangyum/SPARK-28220.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
2021-03-20 22:57:02 +08:00
Sean Owen ed641fbad6 [MINOR][DOCS][ML] Doc 'mode' as a supported Imputer strategy in Pyspark
### What changes were proposed in this pull request?

Document `mode` as a supported Imputer strategy in Pyspark docs.

### Why are the changes needed?

Support was added in 3.1, and documented in Scala, but some Python docs were missed.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests.

Closes #31883 from srowen/ImputerModeDocs.

Authored-by: Sean Owen <srowen@gmail.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
2021-03-20 01:16:49 -05:00
Kent Yao 2cdedef2a0 [SPARK-34128][SQL] Suppress undesirable TTransportException warnings involved in THRIFT-4805
### What changes were proposed in this pull request?

Since Spark 3.0, the `libthrift` has been bumped up from 0.9.3 to 0.12.0.

Due to THRIFT-4805, The SparkThrift Server will print annoying TExceptions. For example, the current thrift server module test in Github action workflow outputs more than 200MB of data for this error only
```java
org.apache.thrift.transport.TTransportException
	at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
	at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
	at org.apache.thrift.transport.TSaslTransport.readLength(TSaslTransport.java:374)
	at org.apache.thrift.transport.TSaslTransport.readFrame(TSaslTransport.java:451)
	at org.apache.thrift.transport.TSaslTransport.read(TSaslTransport.java:433)
	at org.apache.thrift.transport.TSaslServerTransport.read(TSaslServerTransport.java:43)
	at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
	at org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:425)
	at org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:321)
	at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:225)
	at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:27)
	at org.apache.hive.service.auth.TSetIpAddressProcessor.process(TSetIpAddressProcessor.java:53)
	at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:310)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
```

I checked the latest `hive-service-rpc` module in the maven center,  https://mvnrepository.com/artifact/org.apache.hive/hive-service-rpc/3.1.2.  It still uses the 0.9.3 version.

Unfortunately, I tried the newly released `libthrift 0.14.1`(w/o shading it), it breaks the metastore client side.

```scala
java.lang.NoSuchMethodError: org.apache.thrift.transport.TSocket.<init>(Ljava/lang/String;II)V
```
On the Thrift side, they just muted it see https://issues.apache.org/jira/browse/THRIFT-4805

So in this PR, I add a filter to suppress the warning

### Why are the changes needed?

if the log is too large, the Github action might truncate it. We need to reduce useless output.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

```build/sbt "hive-thriftserver/testOnly *ThriftServerQueryTestSuite" -Phive-thriftserver``` locally

#### before

```java
[info] - count.sql (1 second, 537 milliseconds)
[info] - decimalArithmeticOperations.sql !!! IGNORED !!!
14:09:53.233 ERROR org.apache.thrift.server.TThreadPoolServer: Thrift error occurred during processing of message.
org.apache.thrift.transport.TTransportException
	at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
	at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
	at org.apache.thrift.transport.TSaslTransport.readLength(TSaslTransport.java:374)
	at org.apache.thrift.transport.TSaslTransport.readFrame(TSaslTransport.java:451)
	at org.apache.thrift.transport.TSaslTransport.read(TSaslTransport.java:433)
	at org.apache.thrift.transport.TSaslServerTransport.read(TSaslServerTransport.java:43)
	at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
	at org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:425)
	at org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:321)
	at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:225)
	at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:27)
	at org.apache.hive.service.auth.TSetIpAddressProcessor.process(TSetIpAddressProcessor.java:53)
	at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:310)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
[info] - group-analytics.sql (4 seconds, 282 milliseconds)

[info] - csv-functions.sql (400 milliseconds)
14:09:24.234 ERROR org.apache.thrift.server.TThreadPoolServer: Thrift error occurred during processing of message.
org.apache.thrift.transport.TTransportException
	at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
	at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
	at org.apache.thrift.transport.TSaslTransport.readLength(TSaslTransport.java:374)
	at org.apache.thrift.transport.TSaslTransport.readFrame(TSaslTransport.java:451)
	at org.apache.thrift.transport.TSaslTransport.read(TSaslTransport.java:433)
	at org.apache.thrift.transport.TSaslServerTransport.read(TSaslServerTransport.java:43)
	at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
	at org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:425)
	at org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:321)
	at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:225)
	at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:27)
	at org.apache.hive.service.auth.TSetIpAddressProcessor.process(TSetIpAddressProcessor.java:53)
	at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:310)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
[info] - datetime-formatting-invalid.sql (349 milliseconds)
14:09:26.544 ERROR org.apache.thrift.server.TThreadPoolServer: Thrift error occurred during processing of message.
org.apache.thrift.transport.TTransportException
	at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
	at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
	at org.apache.thrift.transport.TSaslTransport.readLength(TSaslTransport.java:374)
	at org.apache.thrift.transport.TSaslTransport.readFrame(TSaslTransport.java:451)
	at org.apache.thrift.transport.TSaslTransport.read(TSaslTransport.java:433)
	at org.apache.thrift.transport.TSaslServerTransport.read(TSaslServerTransport.java:43)
	at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
	at org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:425)
	at org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:321)
	at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:225)
	at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:27)
	at org.apache.hive.service.auth.TSetIpAddressProcessor.process(TSetIpAddressProcessor.java:53)
	at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:310)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
[info] - except.sql (2 seconds, 309 milliseconds)
14:09:27.782 ERROR org.apache.thrift.server.TThreadPoolServer: Thrift error occurred during processing of message.
org.apache.thrift.transport.TTransportException
	at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
	at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
	at org.apache.thrift.transport.TSaslTransport.readLength(TSaslTransport.java:374)
	at org.apache.thrift.transport.TSaslTransport.readFrame(TSaslTransport.java:451)
	at org.apache.thrift.transport.TSaslTransport.read(TSaslTransport.java:433)
	at org.apache.thrift.transport.TSaslServerTransport.read(TSaslServerTransport.java:43)
	at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
	at org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:425)
	at org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:321)
	at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:225)
	at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:27)
	at org.apache.hive.service.auth.TSetIpAddressProcessor.process(TSetIpAddressProcessor.java:53)
	at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:310)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
[info] - string-functions.sql (1 second, 237 milliseconds)
14:09:27.835 WARN org.apache.spark.sql.execution.datasources.DataSource: All paths were ignored:

14:09:29.266 ERROR org.apache.thrift.server.TThreadPoolServer: Thrift error occurred during processing of message.
org.apache.thrift.transport.TTransportException
	at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
	at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
	at org.apache.thrift.transport.TSaslTransport.readLength(TSaslTransport.java:374)
	at org.apache.thrift.transport.TSaslTransport.readFrame(TSaslTransport.java:451)
	at org.apache.thrift.transport.TSaslTransport.read(TSaslTransport.java:433)
	at org.apache.thrift.transport.TSaslServerTransport.read(TSaslServerTransport.java:43)
	at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
	at org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:425)
	at org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:321)
	at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:225)
	at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:27)
	at org.apache.hive.service.auth.TSetIpAddressProcessor.process(TSetIpAddressProcessor.java:53)
	at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:310)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

```

#### after

```java

[info] - null-propagation.sql (181 milliseconds)
[info] - operators.sql (1 second, 772 milliseconds)
[info] - change-column.sql (241 milliseconds)
[info] - count.sql (1 second, 665 milliseconds)
[info] - decimalArithmeticOperations.sql !!! IGNORED !!!
[info] - group-analytics.sql (3 seconds, 926 milliseconds)
[info] - inline-table.sql (247 milliseconds)
[info] - comparator.sql (223 milliseconds)
[info] - show-tblproperties.sql (148 milliseconds)
[info] - timezone.sql (105 milliseconds)
[info] - parse-schema-string.sql (193 milliseconds)
```

Closes #31895 from yaooqinn/SPARK-34128-2.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-03-19 21:15:28 -07:00
Ruifeng Zheng f11950f08f [SPARK-32384][CORE] repartitionAndSortWithinPartitions avoid shuffle with same partitioner
### What changes were proposed in this pull request?
avoid unnecessary shuffle if possible

### Why are the changes needed?
avoid unnecessary shuffle.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
added testsuites and existing testsuites

Closes #31480 from zhengruifeng/repartitionAndSortWithinPartitions_opt_II.

Authored-by: Ruifeng Zheng <ruifengz@foxmail.com>
Signed-off-by: yi.wu <yi.wu@databricks.com>
2021-03-20 10:29:48 +08:00
Cheng Su 2ff0032e01 [SPARK-34796][SQL] Initialize counter variable for LIMIT code-gen in doProduce()
### What changes were proposed in this pull request?

This PR is to fix the LIMIT code-gen bug in https://issues.apache.org/jira/browse/SPARK-34796, where the counter variable from `BaseLimitExec` is not initialized but used in code-gen. This is because the limit counter variable will be used in upstream operators (LIMIT's child plan, e.g. `ColumnarToRowExec` operator for early termination), but in the same stage, there can be some operators doing the shortcut and not calling `BaseLimitExec`'s `doConsume()`, e.g. [HashJoin.codegenInner](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala#L402). So if we have query that `LocalLimit - BroadcastHashJoin - FileScan` in the same stage, the whole stage code-gen compilation will be failed.

Here is an example:

```
  test("failed limit query") {
    withTable("left_table", "empty_right_table", "output_table") {
      spark.range(5).toDF("k").write.saveAsTable("left_table")
      spark.range(0).toDF("k").write.saveAsTable("empty_right_table")

      withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
        spark.sql("CREATE TABLE output_table (k INT) USING parquet")
        spark.sql(
          s"""
             |INSERT INTO TABLE output_table
             |SELECT t1.k FROM left_table t1
             |JOIN empty_right_table t2
             |ON t1.k = t2.k
             |LIMIT 3
             |""".stripMargin)
      }
    }
  }
```

Query plan:

```
Execute InsertIntoHadoopFsRelationCommand file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sql.SQLQuerySuite/output_table, false, Parquet, Map(path -> file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sql.SQLQuerySuite/output_table), Append, CatalogTable(
Database: default
Table: output_table
Created Time: Thu Mar 18 21:46:26 PDT 2021
Last Access: UNKNOWN
Created By: Spark 3.2.0-SNAPSHOT
Type: MANAGED
Provider: parquet
Location: file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sql.SQLQuerySuite/output_table
Schema: root
 |-- k: integer (nullable = true)
), org.apache.spark.sql.execution.datasources.InMemoryFileIndexb25d08b, [k]
+- *(3) Project [ansi_cast(k#228L as int) AS k#231]
   +- *(3) GlobalLimit 3
      +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#179]
         +- *(2) LocalLimit 3
            +- *(2) Project [k#228L]
               +- *(2) BroadcastHashJoin [k#228L], [k#229L], Inner, BuildRight, false
                  :- *(2) Filter isnotnull(k#228L)
                  :  +- *(2) ColumnarToRow
                  :     +- FileScan parquet default.left_table[k#228L] Batched: true, DataFilters: [isnotnull(k#228L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sq..., PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:bigint>
                  +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#173]
                     +- *(1) Filter isnotnull(k#229L)
                        +- *(1) ColumnarToRow
                           +- FileScan parquet default.empty_right_table[k#229L] Batched: true, DataFilters: [isnotnull(k#229L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sq..., PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:bigint>
```

Codegen failure - https://gist.github.com/c21/ea760c75b546d903247582be656d9d66 .

The uninitialized variable `_limit_counter_1` from `LocalLimitExec` is referenced in `ColumnarToRowExec`, but `BroadcastHashJoinExec` does not call `LocalLimitExec.doConsume()` to initialize the counter variable.

The fix is to move the counter variable initialization to `doProduce()`, as in whole stage code-gen framework, `doProduce()` will definitely be called if upstream operators `doProduce()`/`doConsume()` is called.

Note: this only happens in AQE disabled case, because we have an AQE optimization rule [EliminateUnnecessaryJoin](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateUnnecessaryJoin.scala#L69) to change the whole query to an empty `LocalRelation` if inner join broadcast side is empty with AQE enabled.

### Why are the changes needed?

Fix query failure.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added unit test in `SQLQuerySuite.scala`.

Closes #31892 from c21/limit-fix.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2021-03-20 11:20:52 +09:00
tanel.kiis@gmail.com 620cae098c [SPARK-33122][SQL] Remove redundant aggregates in the Optimzier
### What changes were proposed in this pull request?

Added optimizer rule `RemoveRedundantAggregates`. It removes redundant aggregates from a query plan. A redundant aggregate is an aggregate whose only goal is to keep distinct values, while its parent aggregate would ignore duplicate values.

The affected part of the query plan for TPCDS q87:

Before:
```
== Physical Plan ==
*(26) HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition, true, [id=#785]
   +- *(25) HashAggregate(keys=[], functions=[partial_count(1)])
      +- *(25) HashAggregate(keys=[c_last_name#61, c_first_name#60, d_date#26], functions=[])
         +- *(25) HashAggregate(keys=[c_last_name#61, c_first_name#60, d_date#26], functions=[])
            +- *(25) HashAggregate(keys=[c_last_name#61, c_first_name#60, d_date#26], functions=[])
               +- *(25) HashAggregate(keys=[c_last_name#61, c_first_name#60, d_date#26], functions=[])
                  +- *(25) HashAggregate(keys=[c_last_name#61, c_first_name#60, d_date#26], functions=[])
                     +- Exchange hashpartitioning(c_last_name#61, c_first_name#60, d_date#26, 5), true, [id=#724]
                        +- *(24) HashAggregate(keys=[c_last_name#61, c_first_name#60, d_date#26], functions=[])
                           +- SortMergeJoin [coalesce(c_last_name#61, ), isnull(c_last_name#61), coalesce(c_first_name#60, ), isnull(c_first_name#60), coalesce(d_date#26, 0), isnull(d_date#26)], [coalesce(c_last_name#221, ), isnull(c_last_name#221), coalesce(c_first_name#220, ), isnull(c_first_name#220), coalesce(d_date#186, 0), isnull(d_date#186)], LeftAnti
                              :- ...
```

After:
```
== Physical Plan ==
*(26) HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition, true, [id=#751]
   +- *(25) HashAggregate(keys=[], functions=[partial_count(1)])
      +- *(25) HashAggregate(keys=[c_last_name#61, c_first_name#60, d_date#26], functions=[])
         +- Exchange hashpartitioning(c_last_name#61, c_first_name#60, d_date#26, 5), true, [id=#694]
            +- *(24) HashAggregate(keys=[c_last_name#61, c_first_name#60, d_date#26], functions=[])
               +- SortMergeJoin [coalesce(c_last_name#61, ), isnull(c_last_name#61), coalesce(c_first_name#60, ), isnull(c_first_name#60), coalesce(d_date#26, 0), isnull(d_date#26)], [coalesce(c_last_name#221, ), isnull(c_last_name#221), coalesce(c_first_name#220, ), isnull(c_first_name#220), coalesce(d_date#186, 0), isnull(d_date#186)], LeftAnti
                  :- ...
```

### Why are the changes needed?

Performance improvements - few TPCDS queries have these kinds of duplicate aggregates.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

UT

Benchmarks (sf=5):

OpenJDK 64-Bit Server VM 1.8.0_265-b01 on Linux 5.8.13-arch1-1
Intel(R) Core(TM) i5-6500 CPU  3.20GHz

| Query | Before  | After | Speedup |
| ------| ------- | ------| ------- |
| q14a | 44s | 44s | 1x |
| q14b | 41s | 41s | 1x |
| q38  | 6.5s | 5.9s | 1.1x |
| q87  | 7.2s | 6.8s | 1.1x |
| q14a-v2.7 | 55s | 53s | 1x |

Closes #30018 from tanelk/SPARK-33122.

Lead-authored-by: tanel.kiis@gmail.com <tanel.kiis@gmail.com>
Co-authored-by: Tanel Kiis <tanel.kiis@reach-u.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2021-03-20 11:16:39 +09:00
Liang-Chi Hsieh 7a8a600995 [SPARK-34776][SQL] Nested column pruning should not prune Window produced attributes
### What changes were proposed in this pull request?

This patch proposes to fix a bug related to `NestedColumnAliasing`. The root cause is `Window`  doesn't override `producedAttributes` so `NestedColumnAliasing` rule wrongly prune attributes produced by `Window`.

The master and branch-3.1 both have this issue.

### Why are the changes needed?

It is needed to fix a bug of nested column pruning.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit test.

Closes #31897 from viirya/SPARK-34776.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-03-19 11:44:02 -07:00
Dongjoon Hyun 2fa792aa64 [SPARK-34783][K8S] Support remote template files
### What changes were proposed in this pull request?

This PR aims to support remote driver/executor template files.

### Why are the changes needed?

Currently, `KubernetesUtils.loadPodFromTemplate` supports only local files.

With this PR, we can do the following.
```bash
bin/spark-submit \
...
-c spark.kubernetes.driver.podTemplateFile=s3a://dongjoon/driver.yml \
-c spark.kubernetes.executor.podTemplateFile=s3a://dongjoon/executor.yml \
...
```

### Does this PR introduce _any_ user-facing change?

Yes, this is an improvement.

### How was this patch tested?

Manual testing.

Closes #31877 from dongjoon-hyun/SPARK-34783-2.

Lead-authored-by: Dongjoon Hyun <dhyun@apple.com>
Co-authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-03-19 08:52:42 -07:00
Max Gekk 089c3b77e1 [SPARK-34793][SQL] Prohibit saving of day-time and year-month intervals
### What changes were proposed in this pull request?
For all built-in datasources, prohibit saving of year-month and day-time intervals that were introduced by SPARK-27793. We plan to support saving of such types at the milestone 2, see SPARK-27790.

### Why are the changes needed?
To improve user experience with Spark SQL, and print nicer error message. Current error message might confuse users:
```
scala> Seq(java.time.Period.ofMonths(1)).toDF.write.mode("overwrite").json("/Users/maximgekk/tmp/123")
21/03/18 22:44:35 ERROR FileFormatWriter: Aborting job 8de402d7-ab69-4dc0-aa8e-14ef06bd2d6b.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1) (192.168.1.66 executor driver): org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:418)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:298)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:211)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1437)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Failed to convert value 1 (class of class java.lang.Integer}) with the type of YearMonthIntervalType to JSON.
	at scala.sys.package$.error(package.scala:30)
	at org.apache.spark.sql.catalyst.json.JacksonGenerator.$anonfun$makeWriter$23(JacksonGenerator.scala:179)
	at org.apache.spark.sql.catalyst.json.JacksonGenerator.$anonfun$makeWriter$23$adapted(JacksonGenerator.scala:176)
```

### Does this PR introduce _any_ user-facing change?
Yes. After the changes, the example above:
```
scala> Seq(java.time.Period.ofMonths(1)).toDF.write.mode("overwrite").json("/Users/maximgekk/tmp/123")
org.apache.spark.sql.AnalysisException: Cannot save interval data type into external storage.
```

### How was this patch tested?
1. Checked nested intervals:
```
scala> spark.range(1).selectExpr("""struct(timestamp'2021-01-02 00:01:02' - timestamp'2021-01-01 00:00:00')""").write.mode("overwrite").parquet("/Users/maximgekk/tmp/123")
org.apache.spark.sql.AnalysisException: Cannot save interval data type into external storage.
scala> Seq(Seq(java.time.Period.ofMonths(1))).toDF.write.mode("overwrite").json("/Users/maximgekk/tmp/123")
org.apache.spark.sql.AnalysisException: Cannot save interval data type into external storage.
```
2. By running existing test suites:
```
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *DataSourceV2DataFrameSuite"
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *DataSourceV2SQLSuite"
```

Closes #31884 from MaxGekk/ban-save-intervals.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-03-19 18:47:53 +03:00
Hongyi Zhang 6f89cdfb0c [SPARK-34798][SQL][TESTS] Fix incorrect join condition
### What changes were proposed in this pull request?

join condition 'a.attr == 'c.attr check the reference of  these 2 objects which will always returns false. we need to use === instead

### Why are the changes needed?

Although this join condition always false doesn't break the test but it is not what we expected. We should fix it to avoid future confusing

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

UT

Closes #31890 from opensky142857/SPARK-34798.

Authored-by: Hongyi Zhang <hongyzhang@ebay.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
2021-03-19 23:35:15 +08:00
Wenchen Fan 4b4f8e2a25 [SPARK-34558][SQL][FOLLOWUP] Use final Hadoop conf to instantiate FileSystem in SharedState
### What changes were proposed in this pull request?

This is a follow-up of https://github.com/apache/spark/pull/31671

https://github.com/apache/spark/pull/31671 has an unexpected behavior change that it uses a different Hadoop conf (`sparkContext.hadoopConfiguration`) to instantiate `FileSystem`, which is used to qualify the warehouse path. Before https://github.com/apache/spark/pull/31671 , the Hadoop conf to instantiate `FileSystem` is `session.sessionState.newHadoopConf()`.

More specifically, `session.sessionState.newHadoopConf()` has more conf entries:
1. it includes configs from `SharedState.initialConfigs`
2. in includes configs from `sparkContext.conf`

This PR updates `SharedState` to use the final Hadoop conf to instantiate `FileSystem`.

### Why are the changes needed?

fix behavior change

### Does this PR introduce _any_ user-facing change?

yes, the behavior will be the same before https://github.com/apache/spark/pull/31671

### How was this patch tested?

manually check the log of `FileSystem` and verify the passed in configs.

Closes #31868 from cloud-fan/followup.

Lead-authored-by: Wenchen Fan <wenchen@databricks.com>
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-03-19 22:02:15 +08:00
ulysses-you 58509565f8 [SPARK-34772][SQL] RebaseDateTime loadRebaseRecords should use Spark classloader instead of context
### What changes were proposed in this pull request?

Change context classloader to Spark classloader at `RebaseDateTime.loadRebaseRecords`

### Why are the changes needed?

With custom `spark.sql.hive.metastore.version` and `spark.sql.hive.metastore.jars`.

Spark would use date formatter in `HiveShim` that convert `date` to `string`, if we set `spark.sql.legacy.timeParserPolicy=LEGACY` and the partition type is `date` the `RebaseDateTime` code will be invoked. At that moment, if `RebaseDateTime` is initialized the first time then context class loader is `IsolatedClientLoader`. Such error msg would throw:

```
java.lang.IllegalArgumentException: argument "src" is null
  at com.fasterxml.jackson.databind.ObjectMapper._assertNotNull(ObjectMapper.java:4413)
  at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3157)
  at com.fasterxml.jackson.module.scala.ScalaObjectMapper.readValue(ScalaObjectMapper.scala:187)
  at com.fasterxml.jackson.module.scala.ScalaObjectMapper.readValue$(ScalaObjectMapper.scala:186)
  at org.apache.spark.sql.catalyst.util.RebaseDateTime$$anon$1.readValue(RebaseDateTime.scala:267)
  at org.apache.spark.sql.catalyst.util.RebaseDateTime$.loadRebaseRecords(RebaseDateTime.scala:269)
  at org.apache.spark.sql.catalyst.util.RebaseDateTime$.<init>(RebaseDateTime.scala:291)
  at org.apache.spark.sql.catalyst.util.RebaseDateTime$.<clinit>(RebaseDateTime.scala)
  at org.apache.spark.sql.catalyst.util.DateTimeUtils$.toJavaDate(DateTimeUtils.scala:109)
  at org.apache.spark.sql.catalyst.util.LegacyDateFormatter.format(DateFormatter.scala:95)
  at org.apache.spark.sql.catalyst.util.LegacyDateFormatter.format$(DateFormatter.scala:94)
  at org.apache.spark.sql.catalyst.util.LegacySimpleDateFormatter.format(DateFormatter.scala:138)
  at org.apache.spark.sql.hive.client.Shim_v0_13$ExtractableLiteral$1$.unapply(HiveShim.scala:661)
  at org.apache.spark.sql.hive.client.Shim_v0_13.convert$1(HiveShim.scala:785)
  at org.apache.spark.sql.hive.client.Shim_v0_13.$anonfun$convertFilters$4(HiveShim.scala:826)
```

```
java.lang.NoClassDefFoundError: Could not initialize class org.apache.spark.sql.catalyst.util.RebaseDateTime$
  at org.apache.spark.sql.catalyst.util.DateTimeUtils$.toJavaDate(DateTimeUtils.scala:109)
  at org.apache.spark.sql.catalyst.util.LegacyDateFormatter.format(DateFormatter.scala:95)
  at org.apache.spark.sql.catalyst.util.LegacyDateFormatter.format$(DateFormatter.scala:94)
  at org.apache.spark.sql.catalyst.util.LegacySimpleDateFormatter.format(DateFormatter.scala:138)
  at org.apache.spark.sql.hive.client.Shim_v0_13$ExtractableLiteral$1$.unapply(HiveShim.scala:661)
  at org.apache.spark.sql.hive.client.Shim_v0_13.convert$1(HiveShim.scala:785)
  at org.apache.spark.sql.hive.client.Shim_v0_13.$anonfun$convertFilters$4(HiveShim.scala:826)
  at scala.collection.immutable.Stream.flatMap(Stream.scala:493)
  at org.apache.spark.sql.hive.client.Shim_v0_13.convertFilters(HiveShim.scala:826)
  at org.apache.spark.sql.hive.client.Shim_v0_13.getPartitionsByFilter(HiveShim.scala:848)
  at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$getPartitionsByFilter$1(HiveClientImpl.scala:749)
  at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:291)
  at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:224)
  at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:223)
  at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:273)
  at org.apache.spark.sql.hive.client.HiveClientImpl.getPartitionsByFilter(HiveClientImpl.scala:747)
  at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$listPartitionsByFilter$1(HiveExternalCatalog.scala:1273)
```

The reproduce steps:
1. `spark.sql.hive.metastore.version` and `spark.sql.hive.metastore.jars`.
2. `CREATE TABLE t (c int) PARTITIONED BY (p date)`
3. `SET spark.sql.legacy.timeParserPolicy=LEGACY`
4. `SELECT * FROM t WHERE p='2021-01-01'`

### Does this PR introduce _any_ user-facing change?

Yes, bug fix.

### How was this patch tested?

pass `org.apache.spark.sql.catalyst.util.RebaseDateTimeSuite` and add new unit test to `HiveSparkSubmitSuite.scala`.

Closes #31864 from ulysses-you/SPARK-34772.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
2021-03-19 12:51:43 +08:00
Max Gekk a48b2086dd [SPARK-34761][SQL] Support add/subtract of a day-time interval to/from a timestamp
### What changes were proposed in this pull request?
Support `timestamp +/- day-time interval`. In the PR, I propose to extend the `TimeAdd` expression and support `DayTimeIntervalType` as the `interval` parameter. The expression invokes the new method `DateTimeUtils.timestampAddDayTime()` which splits the input day-time interval to `days` and `microsecond adjustment` of a day, and adds `days` (and the microseconds) to a local timestamp derived from the given timestamp at the given time zone.  The resulted local timestamp is converted back to the offset in microseconds since the epoch.

Also I updated the rules that handle `CalendarIntervalType` and produce `TimeAdd` to take into account new type `DateTimeIntervalType` for the `interval` parameter of `TimeAdd`.

### Why are the changes needed?
To conform the ANSI SQL standard which requires to support such operation over timestamps and intervals:
<img width="811" alt="Screenshot 2021-03-12 at 11 36 14" src="https://user-images.githubusercontent.com/1580697/111081674-865d4900-8515-11eb-86c8-3538ecaf4804.png">

### Does this PR introduce _any_ user-facing change?
Should not since new intervals have not been released yet.

### How was this patch tested?
By running new tests:
```
$ build/sbt "test:testOnly *DateTimeUtilsSuite"
$ build/sbt "test:testOnly *DateExpressionsSuite"
$ build/sbt "test:testOnly *ColumnExpressionSuite"
```

Closes #31855 from MaxGekk/timestamp-add-day-time-interval.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-03-19 04:02:34 +00:00
Karuppayya Rajendran 0a58029d52 [SPARK-31897][SQL] Enable codegen for GenerateExec
### What changes were proposed in this pull request?
Enabling codegen for GenerateExec

### Why are the changes needed?
To leverage code generation for Generators

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
- UT tests added

### Benchmark
```
case class Data(value1: Float, value2: Map[String, String], value3: String)
val path = "<path>"

val numRecords = Seq(10000000, 100000000)
numRecords.map {
  recordCount =>
    import java.util.concurrent.TimeUnit.NANOSECONDS

    val srcDF = spark.range(recordCount).map {
      x => Data(x.toFloat, Map(x.toString -> x.toString ), s"value3$x")
    }.select($"value1", explode($"value2"), $"value3")
    val start = System.nanoTime()
    srcDF
      .write
      .mode("overwrite")
      .parquet(s"$path/$recordCount")
    val end = System.nanoTime()
    val diff = end - start
    (recordCount, NANOSECONDS.toMillis(diff))
}
```
**With codegen**:
```
res0: Seq[(Int, Long)] = List((10000000,13989), (100000000,129625))
```
**Without codegen**:
```
res0: Seq[(Int, Long)] = List((10000000,15736), (100000000,150399))
```

Closes #28715 from karuppayya/SPARK-31897.

Lead-authored-by: Karuppayya Rajendran <karuppayya1990@gmail.com>
Co-authored-by: Karuppayya Rajendran <karuppayya.rajendran@apple.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
2021-03-18 20:50:28 -07:00
Kousuke Saruta 07ee73234f [SPARK-34747][SQL][DOCS] Add virtual operators to the built-in function document
### What changes were proposed in this pull request?

This PR fix an issue that virtual operators (`||`, `!=`, `<>`, `between` and `case`) are absent from the Spark SQL Built-in functions document.

### Why are the changes needed?

The document should explain about all the supported built-in operators.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Built the document with `SKIP_SCALADOC=1 SKIP_RDOC=1 SKIP_PYTHONDOC=1 bundler exec jekyll build` and then, confirmed the document.

![neq1](https://user-images.githubusercontent.com/4736016/111192859-e2e76380-85fc-11eb-89c9-75916a5e856a.png)
![neq2](https://user-images.githubusercontent.com/4736016/111192874-e7ac1780-85fc-11eb-9a9b-c504265b373f.png)
![between](https://user-images.githubusercontent.com/4736016/111192898-eda1f880-85fc-11eb-992d-cf80c544ec27.png)
![case](https://user-images.githubusercontent.com/4736016/111192918-f266ac80-85fc-11eb-9306-5dbc413a0cdb.png)
![double_pipe](https://user-images.githubusercontent.com/4736016/111192952-fb577e00-85fc-11eb-932e-385e5c2a5205.png)

Closes #31841 from sarutak/builtin-op-doc.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2021-03-19 10:19:26 +09:00
Cheng Su 8207e2f65c [SPARK-34781][SQL] Eliminate LEFT SEMI/ANTI joins to its left child side in AQE
### What changes were proposed in this pull request?

In `EliminateJoinToEmptyRelation.scala`, we can extend it to cover more cases for LEFT SEMI and LEFT ANI joins:

* Join is left semi join, join right side is non-empty and condition is empty. Eliminate join to its left side.
* Join is left anti join, join right side is empty. Eliminate join to its left side.

Given we eliminate join to its left side here, renaming the current optimization rule to `EliminateUnnecessaryJoin` instead.
In addition, also change to use `checkRowCount()` to check run time row count, instead of using `EmptyHashedRelation`. So this can cover `BroadcastNestedLoopJoin` as well. (`BroadcastNestedLoopJoin`'s broadcast side is `Array[InternalRow]`, not `HashedRelation`).

### Why are the changes needed?

Cover more join cases, and improve query performance for affected queries.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added unit tests in `AdaptiveQueryExecSuite.scala`.

Closes #31873 from c21/aqe-join.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2021-03-19 09:41:52 +09:00
zengruios 5570f817b2 [SPARK-34760][EXAMPLES] Replace favorite_color with age in JavaSQLDataSourceExample
### What changes were proposed in this pull request?
In JavaSparkSQLExample when excecute 'peopleDF.write().partitionBy("favorite_color").bucketBy(42,"name").saveAsTable("people_partitioned_bucketed");'
throws Exception: 'Exception in thread "main" org.apache.spark.sql.AnalysisException: partition column favorite_color is not defined in table people_partitioned_bucketed, defined table columns are: age, name;'
Change the column favorite_color to age.

### Why are the changes needed?
Run JavaSparkSQLExample successfully.

### Does this PR introduce _any_ user-facing change?
NO

### How was this patch tested?
test in JavaSparkSQLExample .

Closes #31851 from zengruios/SPARK-34760.

Authored-by: zengruios <578395184@qq.com>
Signed-off-by: Kent Yao <yao@apache.org>
2021-03-18 22:53:58 +08:00
yangjie01 2e836cdb59 [SPARK-34774][BUILD] Ensure change-scala-version.sh update scala.version in parent POM correctly
### What changes were proposed in this pull request?
After SPARK-34507,  execute` change-scala-version.sh` script will update `scala.version` in parent pom, but if we execute the following commands in order:

```
dev/change-scala-version.sh 2.13
dev/change-scala-version.sh 2.12
git status
```

there will generate git diff as follow:

```
diff --git a/pom.xml b/pom.xml
index ddc4ce2f68..f43d8c8f78 100644
--- a/pom.xml
+++ b/pom.xml
 -162,7 +162,7
     <commons.math3.version>3.4.1</commons.math3.version>

     <commons.collections.version>3.2.2</commons.collections.version>
-    <scala.version>2.12.10</scala.version>
+    <scala.version>2.13.5</scala.version>
     <scala.binary.version>2.12</scala.binary.version>
     <scalatest-maven-plugin.version>2.0.0</scalatest-maven-plugin.version>
     <scalafmt.parameters>--test</scalafmt.parameters>
```

seem 'scala.version' property was not update correctly.

So this pr add an extra 'scala.version' to scala-2.12 profile to ensure change-scala-version.sh can update the public `scala.version` property correctly.

### Why are the changes needed?
Bug fix.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
**Manual test**

Execute the following commands in order:

```
dev/change-scala-version.sh 2.13
dev/change-scala-version.sh 2.12
git status
```

**Before**

```
diff --git a/pom.xml b/pom.xml
index ddc4ce2f68..f43d8c8f78 100644
--- a/pom.xml
+++ b/pom.xml
 -162,7 +162,7
     <commons.math3.version>3.4.1</commons.math3.version>

     <commons.collections.version>3.2.2</commons.collections.version>
-    <scala.version>2.12.10</scala.version>
+    <scala.version>2.13.5</scala.version>
     <scala.binary.version>2.12</scala.binary.version>
     <scalatest-maven-plugin.version>2.0.0</scalatest-maven-plugin.version>
     <scalafmt.parameters>--test</scalafmt.parameters>
```

**After**

No git diff.

Closes #31865 from LuciferYang/SPARK-34774.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
2021-03-18 07:33:23 -05:00
yi.wu d99135b66a [SPARK-34741][SQL] MergeIntoTable should avoid ambiguous reference in UpdateAction
### What changes were proposed in this pull request?

This PR proposes to deduplicate the source table when there're conflicting attributes between the target table and the source table.

### Why are the changes needed?

When resolving the `UpdateAction`, which could reference attributes from both target and source tables,  Spark should know clearly where the attribute comes from when there're conflicting attributes instead of picking up a random one.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added a unit test and updated existing tests.

Closes #31835 from Ngone51/dedup-MergeIntoTable.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-03-18 15:54:41 +08:00
Bo Zhang 86ea520320 [SPARK-34757][CORE][DEPLOY] Ignore cache for SNAPSHOT dependencies in spark-submit
### What changes were proposed in this pull request?
This change is to ignore cache for SNAPSHOT dependencies in spark-submit.

### Why are the changes needed?
When spark-submit is executed with --packages, it will not download the dependency jars when they are available in cache (e.g. ivy cache), even when the dependencies are SNAPSHOT.

This might block developers who work on external modules in Spark (e.g. spark-avro), since they need to remove the cache manually every time when they update the code during developments (which generates SNAPSHOT jars). Without knowing this, they could be blocked wondering why their code changes are not reflected in spark-submit executions.

### Does this PR introduce _any_ user-facing change?
Yes. With this change, developers/users who run spark-submit with SNAPSHOT dependencies do not need to remove the cache every time when the SNAPSHOT dependencies are updated.

### How was this patch tested?
Added a unit test.

Closes #31849 from bozhang2820/spark-submit-cache-ignore.

Authored-by: Bo Zhang <bo.zhang@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2021-03-18 15:32:29 +09:00
Luan 25e7d1ceee [SPARK-34728][SQL] Remove all SQLConf.get if extends from SQLConfHelper
### What changes were proposed in this pull request?

Remove all SQLConf.get to conf if extends from SQLConfHelper

### Why are the changes needed?

Clean up code.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing unit tests.

Closes #31822 from leoluan2009/SPARK-34728.

Authored-by: Luan <luanxuedong2009@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2021-03-18 15:04:41 +09:00
Bruce Robbins f8a8b340b3 [SPARK-34731][CORE] Avoid ConcurrentModificationException when redacting properties in EventLoggingListener
### What changes were proposed in this pull request?

Change DAGScheduler to pass a clone of the Properties object, rather than the original object, to the SparkListenerJobStart event.

### Why are the changes needed?

 DAGScheduler might modify the Properties object (e.g., in addPySparkConfigsToProperties) after firing off the SparkListenerJobStart event. Since the handler for that event (onJobStart in EventLoggingListener) will iterate over the elements of the Property object, this sometimes results in a ConcurrentModificationException.

This can be demonstrated using these steps:
```
$ bin/spark-shell --conf spark.ui.showConsoleProgress=false \
--conf spark.executor.cores=1 --driver-memory 4g --conf \
"spark.ui.showConsoleProgress=false" \
--conf spark.eventLog.enabled=true \
--conf spark.eventLog.dir=/tmp/spark-events
...
scala> (0 to 500).foreach { i =>
     |   val df = spark.range(0, 20000).toDF("a")
     |   df.filter("a > 12").count
     | }
21/03/12 18:16:44 ERROR AsyncEventQueue: Listener EventLoggingListener threw an exception
java.util.ConcurrentModificationException
	at java.util.Hashtable$Enumerator.next(Hashtable.java:1387)
```

I've not actually seen a ConcurrentModificationException in onStageSubmitted, only in onJobStart. However, they both iterate over the Properties object, so for safety's sake I pass a clone to SparkListenerStageSubmitted as well.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
By repeatedly running the reproduction steps from above.

Closes #31826 from bersprockets/elconcurrent.

Authored-by: Bruce Robbins <bersprockets@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2021-03-18 14:59:57 +09:00
yi.wu 4d90c5dc0e [SPARK-34087][SQL] Fix memory leak of ExecutionListenerBus
### What changes were proposed in this pull request?

This PR proposes an alternative way to fix the memory leak of `ExecutionListenerBus`, which would automatically clean them up.

Basically, the idea is to add `registerSparkListenerForCleanup` to `ContextCleaner`, so we can remove the `ExecutionListenerBus` from `LiveListenerBus` when the `SparkSession` is GC'ed.

On the other hand, to make the `SparkSession` GC-able, we need to get rid of the reference of `SparkSession` in `ExecutionListenerBus`. Therefore, we introduced the `sessionUUID`, which is a unique identifier for SparkSession, to replace the  `SparkSession` object.

Note that, the proposal wouldn't take effect when `spark.cleaner.referenceTracking=false` since it depends on `ContextCleaner`.

### Why are the changes needed?

Fix the memory leak caused by `ExecutionListenerBus` mentioned in SPARK-34087.

### Does this PR introduce _any_ user-facing change?

Yes, save memory for users.

### How was this patch tested?

Added unit test.

Closes #31839 from Ngone51/fix-mem-leak-of-ExecutionListenerBus.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2021-03-18 13:27:03 +09:00
Kousuke Saruta c5cadfefdf [SPARK-34762][BUILD] Fix the build failure with Scala 2.13 which is related to commons-cli
### What changes were proposed in this pull request?

This PR fixes the build failure with Scala 2.13 which is related to `commons-cli`.
The last few days, build with Scala 2.13 on GA continues to fail and the error message says like as follows.
```
[error] /home/runner/work/spark/spark/sql/hive-thriftserver/src/main/java/org/apache/hive/service/server/HiveServer2.java:26:1:  error: package org.apache.commons.cli does not exist
1278[error] import org.apache.commons.cli.GnuParser;
```
The reason is that `mvn help` in `change-scala-version.sh` downloads the POM file of `commons-cli` but doesn't download the JAR file, leading the build failure.

This PR also adds `commons-cli` to the dependencies explicitly because HiveThriftServer depends on it.
### Why are the changes needed?

Expect to fix the build failure with Scala 2.13.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

I confirmed that build successfully finishes with Scala 2.13 on my laptop.
```
find ~/.m2 -name commons-cli -exec rm -rf {} \;
find ~/.ivy2 -name commons-cli -exec rm -rf {} \;
find ~/.cache/ -name commons-cli -exec rm -rf {} \; // For Linux
find ~/Library/Caches -name commons-cli -exec rm -rf {} \; // For macOS

dev/change-scala-version 2.13
./build/sbt -Pyarn -Pmesos -Pkubernetes -Phive -Phive-thriftserver -Phadoop-cloud -Pkinesis-asl -Pdocker-integration-tests -Pkubernetes-integration-tests -Pspark-ganglia-lgpl -Pscala-2.13 clean compile test:compile
```

Closes #31862 from sarutak/commons-cli.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2021-03-18 12:31:50 +09:00
gengjiaan 569fb133d0 [SPARK-33602][SQL] Group exception messages in execution/datasources
### What changes were proposed in this pull request?
This PR group exception messages in `/core/src/main/scala/org/apache/spark/sql/execution/datasources`.

### Why are the changes needed?
It will largely help with standardization of error messages and its maintenance.

### Does this PR introduce _any_ user-facing change?
No. Error messages remain unchanged.

### How was this patch tested?
No new tests - pass all original tests to make sure it doesn't break any existing behavior.

Closes #31757 from beliefer/SPARK-33602.

Authored-by: gengjiaan <gengjiaan@360.cn>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-03-17 14:04:02 +00:00
Wenchen Fan 9f7b0a035b [SPARK-34758][SQL] Simplify Analyzer.resolveLiteralFunction
### What changes were proposed in this pull request?

This PR simplifies `Analyzer.resolveLiteralFunction` to always create the `Alias`. The caller side will remove the `Alias` if it's not necessary.

### Why are the changes needed?

code simplification.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing tests

Closes #31844 from cloud-fan/minor.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2021-03-17 21:26:44 +09:00