## What changes were proposed in this pull request?
R Structured Streaming API for withWatermark, trigger, partitionBy
## How was this patch tested?
manual, unit tests
Author: Felix Cheung <felixcheung_m@hotmail.com>
Closes#20129 from felixcheung/rwater.
## What changes were proposed in this pull request?
move `ColumnVector` and related classes to `org.apache.spark.sql.vectorized`, and improve the document.
## How was this patch tested?
existing tests.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#20116 from cloud-fan/column-vector.
## What changes were proposed in this pull request?
When overwriting a partitioned table with dynamic partition columns, the behavior is different between data source and hive tables.
data source table: delete all partition directories that match the static partition values provided in the insert statement.
hive table: only delete partition directories which have data written into it
This PR adds a new config to make users be able to choose hive's behavior.
## How was this patch tested?
new tests
Author: Wenchen Fan <wenchen@databricks.com>
Closes#18714 from cloud-fan/overwrite-partition.
## What changes were proposed in this pull request?
Currently, our CREATE TABLE syntax require the EXACT order of clauses. It is pretty hard to remember the exact order. Thus, this PR is to make optional clauses order insensitive for `CREATE TABLE` SQL statement.
```
CREATE [TEMPORARY] TABLE [IF NOT EXISTS] [db_name.]table_name
[(col_name1 col_type1 [COMMENT col_comment1], ...)]
USING datasource
[OPTIONS (key1=val1, key2=val2, ...)]
[PARTITIONED BY (col_name1, col_name2, ...)]
[CLUSTERED BY (col_name3, col_name4, ...) INTO num_buckets BUCKETS]
[LOCATION path]
[COMMENT table_comment]
[TBLPROPERTIES (key1=val1, key2=val2, ...)]
[AS select_statement]
```
The proposal is to make the following clauses order insensitive.
```
[OPTIONS (key1=val1, key2=val2, ...)]
[PARTITIONED BY (col_name1, col_name2, ...)]
[CLUSTERED BY (col_name3, col_name4, ...) INTO num_buckets BUCKETS]
[LOCATION path]
[COMMENT table_comment]
[TBLPROPERTIES (key1=val1, key2=val2, ...)]
```
The same idea is also applicable to Create Hive Table.
```
CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name
[(col_name1[:] col_type1 [COMMENT col_comment1], ...)]
[COMMENT table_comment]
[PARTITIONED BY (col_name2[:] col_type2 [COMMENT col_comment2], ...)]
[ROW FORMAT row_format]
[STORED AS file_format]
[LOCATION path]
[TBLPROPERTIES (key1=val1, key2=val2, ...)]
[AS select_statement]
```
The proposal is to make the following clauses order insensitive.
```
[COMMENT table_comment]
[PARTITIONED BY (col_name2[:] col_type2 [COMMENT col_comment2], ...)]
[ROW FORMAT row_format]
[STORED AS file_format]
[LOCATION path]
[TBLPROPERTIES (key1=val1, key2=val2, ...)]
```
## How was this patch tested?
Added test cases
Author: gatorsmile <gatorsmile@gmail.com>
Closes#20133 from gatorsmile/createDataSourceTableDDL.
## What changes were proposed in this pull request?
stageAttemptId added in TaskContext and corresponding construction modification
## How was this patch tested?
Added a new test in TaskContextSuite, two cases are tested:
1. Normal case without failure
2. Exception case with resubmitted stages
Link to [SPARK-22897](https://issues.apache.org/jira/browse/SPARK-22897)
Author: Xianjin YE <advancedxy@gmail.com>
Closes#20082 from advancedxy/SPARK-22897.
## What changes were proposed in this pull request?
This change adds `ArrayType` support for working with Arrow in pyspark when creating a DataFrame, calling `toPandas()`, and using vectorized `pandas_udf`.
## How was this patch tested?
Added new Python unit tests using Array data.
Author: Bryan Cutler <cutlerb@gmail.com>
Closes#20114 from BryanCutler/arrow-ArrayType-support-SPARK-22530.
## What changes were proposed in this pull request?
Currently, we do not guarantee an order evaluation of conjuncts in either Filter or Join operator. This is also true to the mainstream RDBMS vendors like DB2 and MS SQL Server. Thus, we should also push down the deterministic predicates that are after the first non-deterministic, if possible.
## How was this patch tested?
Updated the existing test cases.
Author: gatorsmile <gatorsmile@gmail.com>
Closes#20069 from gatorsmile/morePushDown.
## What changes were proposed in this pull request?
There is already test using window spilling, but the test coverage is not ideal.
In this PR the already existing test was fixed and additional cases added.
## How was this patch tested?
Automated: Pass the Jenkins.
Author: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Closes#20022 from gaborgsomogyi/SPARK-22363.
## What changes were proposed in this pull request?
This pr modified `concat` to concat binary inputs into a single binary output.
`concat` in the current master always output data as a string. But, in some databases (e.g., PostgreSQL), if all inputs are binary, `concat` also outputs binary.
## How was this patch tested?
Added tests in `SQLQueryTestSuite` and `TypeCoercionSuite`.
Author: Takeshi Yamamuro <yamamuro@apache.org>
Closes#19977 from maropu/SPARK-22771.
## What changes were proposed in this pull request?
ML regression package testsuite add StructuredStreaming test
In order to make testsuite easier to modify, new helper function added in `MLTest`:
```
def testTransformerByGlobalCheckFunc[A : Encoder](
dataframe: DataFrame,
transformer: Transformer,
firstResultCol: String,
otherResultCols: String*)
(globalCheckFunction: Seq[Row] => Unit): Unit
```
## How was this patch tested?
N/A
Author: WeichenXu <weichen.xu@databricks.com>
Author: Bago Amirbekian <bago@databricks.com>
Closes#19979 from WeichenXu123/ml_stream_test.
## What changes were proposed in this pull request?
The issue has been raised in two Jira tickets: [SPARK-21657](https://issues.apache.org/jira/browse/SPARK-21657), [SPARK-16998](https://issues.apache.org/jira/browse/SPARK-16998). Basically, what happens is that in collection generators like explode/inline we create many rows from each row. Currently each exploded row contains also the column on which it was created. This causes, for example, if we have a 10k array in one row that this array will get copy 10k times - to each of the row. this results a qudratic memory consumption. However, it is a common case that the original column gets projected out after the explode, so we can avoid duplicating it.
In this solution we propose to identify this situation in the optimizer and turn on a flag for omitting the original column in the generation process.
## How was this patch tested?
1. We added a benchmark test to MiscBenchmark that shows x16 improvement in runtimes.
2. We ran some of the other tests in MiscBenchmark and they show 15% improvements.
3. We ran this code on a specific case from our production data with rows containing arrays of size ~200k and it reduced the runtime from 6 hours to 3 mins.
Author: oraviv <oraviv@paypal.com>
Author: uzadude <ohad.raviv@gmail.com>
Author: uzadude <15645757+uzadude@users.noreply.github.com>
Closes#19683 from uzadude/optimize_explode.
## What changes were proposed in this pull request?
When there are no broadcast hints, the current spark strategies will prefer to building the right side, without considering the sizes of the two tables. This patch added the logic to consider the sizes of the two tables for the build side. To make the logic clear, the build side is determined by two steps:
1. If there are broadcast hints, the build side is determined by `broadcastSideByHints`;
2. If there are no broadcast hints, the build side is determined by `broadcastSideBySizes`;
3. If the broadcast is disabled by the config, it falls back to the next cases.
## How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Feng Liu <fengliu@databricks.com>
Closes#20099 from liufengdb/fix-spark-strategies.
## What changes were proposed in this pull request?
With #19474, children of insertion commands are missing in UI.
To fix it:
1. Create a new physical plan `DataWritingCommandExec` to exec `DataWritingCommand` with children. So that the other commands won't be affected.
2. On creation of `DataWritingCommand`, a new field `allColumns` must be specified, which is the output of analyzed plan.
3. In `FileFormatWriter`, the output schema will use `allColumns` instead of the output of optimized plan.
Before code changes:
![2017-12-19 10 27 10](https://user-images.githubusercontent.com/1097932/34161850-d2fd0acc-e50c-11e7-898a-177154fe7d8e.png)
After code changes:
![2017-12-19 10 27 04](https://user-images.githubusercontent.com/1097932/34161865-de23de26-e50c-11e7-9131-0c32f7b7b749.png)
## How was this patch tested?
Unit test
Author: Wang Gengliang <ltnwgl@gmail.com>
Closes#20020 from gengliangwang/insert.
## What changes were proposed in this pull request?
Escape of escape should be considered when using the UniVocity csv encoding/decoding library.
Ref: https://github.com/uniVocity/univocity-parsers#escaping-quote-escape-characters
One option is added for reading and writing CSV: `escapeQuoteEscaping`
## How was this patch tested?
Unit test added.
Author: soonmok-kwon <soonmok.kwon@navercorp.com>
Closes#20004 from ep1804/SPARK-22818.
## What changes were proposed in this pull request?
Test Coverage for `DateTimeOperations`, this is a Sub-tasks for [SPARK-22722](https://issues.apache.org/jira/browse/SPARK-22722).
## How was this patch tested?
N/A
Author: Yuming Wang <wgyumg@gmail.com>
Closes#20061 from wangyum/SPARK-22890.
## What changes were proposed in this pull request?
This PR cleans up a few Java linter errors for Apache Spark 2.3 release.
## How was this patch tested?
```bash
$ dev/lint-java
Using `mvn` from path: /usr/local/bin/mvn
Checkstyle checks passed.
```
We can see the result from [Travis CI](https://travis-ci.org/dongjoon-hyun/spark/builds/322470787), too.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#20101 from dongjoon-hyun/fix-java-lint.
## What changes were proposed in this pull request?
For empty/null column, the result of `ApproximatePercentile` is null. Then in `ApproxCountDistinctForIntervals`, a `MatchError` (for `endpoints`) will be thrown if we try to generate histogram for that column. Besides, there is no need to generate histogram for such column. In this patch, we exclude such column when generating histogram.
## How was this patch tested?
Enhanced test cases for empty/null columns.
Author: Zhenhua Wang <wangzhenhua@huawei.com>
Closes#20102 from wzhfy/no_record_hgm_bug.
## What changes were proposed in this pull request?
This PR addresses additional review comments in #19811
## How was this patch tested?
Existing test suites
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Closes#20036 from kiszk/SPARK-18066-followup.
## What changes were proposed in this pull request?
Test coverage for arithmetic operations leading to:
1. Precision loss
2. Overflow
Moreover, tests for casting bad string to other input types and for using bad string as operators of some functions.
## How was this patch tested?
added tests
Author: Marco Gaido <marcogaido91@gmail.com>
Closes#20084 from mgaido91/SPARK-22904.
## What changes were proposed in this pull request?
`DateTimeOperations` accept [`StringType`](ae998ec2b5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala (L669)), but:
```
spark-sql> SELECT '2017-12-24' + interval 2 months 2 seconds;
Error in query: cannot resolve '(CAST('2017-12-24' AS DOUBLE) + interval 2 months 2 seconds)' due to data type mismatch: differing types in '(CAST('2017-12-24' AS DOUBLE) + interval 2 months 2 seconds)' (double and calendarinterval).; line 1 pos 7;
'Project [unresolvedalias((cast(2017-12-24 as double) + interval 2 months 2 seconds), None)]
+- OneRowRelation
spark-sql>
```
After this PR:
```
spark-sql> SELECT '2017-12-24' + interval 2 months 2 seconds;
2018-02-24 00:00:02
Time taken: 0.2 seconds, Fetched 1 row(s)
```
## How was this patch tested?
unit tests
Author: Yuming Wang <wgyumg@gmail.com>
Closes#20067 from wangyum/SPARK-22894.
## What changes were proposed in this pull request?
In SPARK-20586 the flag `deterministic` was added to Scala UDF, but it is not available for python UDF. This flag is useful for cases when the UDF's code can return different result with the same input. Due to optimization, duplicate invocations may be eliminated or the function may even be invoked more times than it is present in the query. This can lead to unexpected behavior.
This PR adds the deterministic flag, via the `asNondeterministic` method, to let the user mark the function as non-deterministic and therefore avoid the optimizations which might lead to strange behaviors.
## How was this patch tested?
Manual tests:
```
>>> from pyspark.sql.functions import *
>>> from pyspark.sql.types import *
>>> df_br = spark.createDataFrame([{'name': 'hello'}])
>>> import random
>>> udf_random_col = udf(lambda: int(100*random.random()), IntegerType()).asNondeterministic()
>>> df_br = df_br.withColumn('RAND', udf_random_col())
>>> random.seed(1234)
>>> udf_add_ten = udf(lambda rand: rand + 10, IntegerType())
>>> df_br.withColumn('RAND_PLUS_TEN', udf_add_ten('RAND')).show()
+-----+----+-------------+
| name|RAND|RAND_PLUS_TEN|
+-----+----+-------------+
|hello| 3| 13|
+-----+----+-------------+
```
Author: Marco Gaido <marcogaido91@gmail.com>
Author: Marco Gaido <mgaido@hortonworks.com>
Closes#19929 from mgaido91/SPARK-22629.
## What changes were proposed in this pull request?
Decimal type is not yet supported in `ArrowWriter`.
This is adding the decimal type support.
## How was this patch tested?
Added a test to `ArrowConvertersSuite`.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#18754 from ueshin/issues/SPARK-21552.
## What changes were proposed in this pull request?
We should use `dataType.simpleString` to unified the data type mismatch message:
Before:
```
spark-sql> select cast(1 as binary);
Error in query: cannot resolve 'CAST(1 AS BINARY)' due to data type mismatch: cannot cast IntegerType to BinaryType; line 1 pos 7;
```
After:
```
park-sql> select cast(1 as binary);
Error in query: cannot resolve 'CAST(1 AS BINARY)' due to data type mismatch: cannot cast int to binary; line 1 pos 7;
```
## How was this patch tested?
Exist test.
Author: Yuming Wang <wgyumg@gmail.com>
Closes#20064 from wangyum/SPARK-22893.
## What changes were proposed in this pull request?
Basic continuous execution, supporting map/flatMap/filter, with commits and advancement through RPC.
## How was this patch tested?
new unit-ish tests (exercising execution end to end)
Author: Jose Torres <jose@databricks.com>
Closes#19984 from jose-torres/continuous-impl.
When one execution has multiple jobs, we need to append to the set of
stages, not replace them on every job.
Added unit test and ran existing tests on jenkins
Author: Imran Rashid <irashid@cloudera.com>
Closes#20047 from squito/SPARK-22861.
## What changes were proposed in this pull request?
This is a followup PR of https://github.com/apache/spark/pull/19257 where gatorsmile had left couple comments wrt code style.
## How was this patch tested?
Doesn't change any functionality. Will depend on build to see if no checkstyle rules are violated.
Author: Tejas Patil <tejasp@fb.com>
Closes#20041 from tejasapatil/followup_19257.
## What changes were proposed in this pull request?
Test Coverage for `WindowFrameCoercion` and `DecimalPrecision`, this is a Sub-tasks for [SPARK-22722](https://issues.apache.org/jira/browse/SPARK-22722).
## How was this patch tested?
N/A
Author: Yuming Wang <wgyumg@gmail.com>
Closes#20008 from wangyum/SPARK-22822.
## What changes were proposed in this pull request?
In https://github.com/apache/spark/pull/19681 we introduced a new interface called `AppStatusPlugin`, to register listeners and set up the UI for both live and history UI.
However I think it's an overkill for live UI. For example, we should not register `SQLListener` if users are not using SQL functions. Previously we register the `SQLListener` and set up SQL tab when `SparkSession` is firstly created, which indicates users are going to use SQL functions. But in #19681 , we register the SQL functions during `SparkContext` creation. The same thing should apply to streaming too.
I think we should keep the previous behavior, and only use this new interface for history server.
To reflect this change, I also rename the new interface to `SparkHistoryUIPlugin`
This PR also refines the tests for sql listener.
## How was this patch tested?
existing tests
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19981 from cloud-fan/listener.
## What changes were proposed in this pull request?
Upgrade Spark to Arrow 0.8.0 for Java and Python. Also includes an upgrade of Netty to 4.1.17 to resolve dependency requirements.
The highlights that pertain to Spark for the update from Arrow versoin 0.4.1 to 0.8.0 include:
* Java refactoring for more simple API
* Java reduced heap usage and streamlined hot code paths
* Type support for DecimalType, ArrayType
* Improved type casting support in Python
* Simplified type checking in Python
## How was this patch tested?
Existing tests
Author: Bryan Cutler <cutlerb@gmail.com>
Author: Shixiong Zhu <zsxwing@gmail.com>
Closes#19884 from BryanCutler/arrow-upgrade-080-SPARK-22324.
## What changes were proposed in this pull request?
Introduce a new interface `SessionConfigSupport` for `DataSourceV2`, it can help to propagate session configs with the specified key-prefix to all data source operations in this session.
## How was this patch tested?
Add new test suite `DataSourceV2UtilsSuite`.
Author: Xingbo Jiang <xingbo.jiang@databricks.com>
Closes#19861 from jiangxb1987/datasource-configs.
## What changes were proposed in this pull request?
Some users depend on source compatibility with the org.apache.spark.sql.execution.streaming.Offset class. Although this is not a stable interface, we can keep it in place for now to simplify upgrades to 2.3.
Author: Jose Torres <jose@databricks.com>
Closes#20012 from joseph-torres/binary-compat.
## What changes were proposed in this pull request?
Like `Parquet`, users can use `ORC` with Apache Spark structured streaming. This PR adds `orc()` to `DataStreamReader`(Scala/Python) in order to support creating streaming dataset with ORC file format more easily like the other file formats. Also, this adds a test coverage for ORC data source and updates the document.
**BEFORE**
```scala
scala> spark.readStream.schema("a int").orc("/tmp/orc_ss").writeStream.format("console").start()
<console>:24: error: value orc is not a member of org.apache.spark.sql.streaming.DataStreamReader
spark.readStream.schema("a int").orc("/tmp/orc_ss").writeStream.format("console").start()
```
**AFTER**
```scala
scala> spark.readStream.schema("a int").orc("/tmp/orc_ss").writeStream.format("console").start()
res0: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper678b3746
scala>
-------------------------------------------
Batch: 0
-------------------------------------------
+---+
| a|
+---+
| 1|
+---+
```
## How was this patch tested?
Pass the newly added test cases.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#19975 from dongjoon-hyun/SPARK-22781.
## What changes were proposed in this pull request?
This change adds local checkpoint support to datasets and respective bind from Python Dataframe API.
If reliability requirements can be lowered to favor performance, as in cases of further quick transformations followed by a reliable save, localCheckpoints() fit very well.
Furthermore, at the moment Reliable checkpoints still incur double computation (see #9428)
In general it makes the API more complete as well.
## How was this patch tested?
Python land quick use case:
```python
>>> from time import sleep
>>> from pyspark.sql import types as T
>>> from pyspark.sql import functions as F
>>> def f(x):
sleep(1)
return x*2
...:
>>> df1 = spark.range(30, numPartitions=6)
>>> df2 = df1.select(F.udf(f, T.LongType())("id"))
>>> %time _ = df2.collect()
CPU times: user 7.79 ms, sys: 5.84 ms, total: 13.6 ms
Wall time: 12.2 s
>>> %time df3 = df2.localCheckpoint()
CPU times: user 2.38 ms, sys: 2.3 ms, total: 4.68 ms
Wall time: 10.3 s
>>> %time _ = df3.collect()
CPU times: user 5.09 ms, sys: 410 µs, total: 5.5 ms
Wall time: 148 ms
>>> sc.setCheckpointDir(".")
>>> %time df3 = df2.checkpoint()
CPU times: user 4.04 ms, sys: 1.63 ms, total: 5.67 ms
Wall time: 20.3 s
```
Author: Fernando Pereira <fernando.pereira@epfl.ch>
Closes#19805 from ferdonline/feature_dataset_localCheckpoint.
## What changes were proposed in this pull request?
Currently, the task memory manager throws an OutofMemory error when there is an IO exception happens in spill() - https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java#L194. Similarly there any many other places in code when if a task is not able to acquire memory due to an exception we throw an OutofMemory error which kills the entire executor and hence failing all the tasks that are running on that executor instead of just failing one single task.
## How was this patch tested?
Unit tests
Author: Sital Kedia <skedia@fb.com>
Closes#20014 from sitalkedia/skedia/upstream_SPARK-22827.
## What changes were proposed in this pull request?
Test Coverage for `WidenSetOperationTypes`, `BooleanEquality`, `StackCoercion` and `Division`, this is a Sub-tasks for [SPARK-22722](https://issues.apache.org/jira/browse/SPARK-22722).
## How was this patch tested?
N/A
Author: Yuming Wang <wgyumg@gmail.com>
Closes#20006 from wangyum/SPARK-22821.
## What changes were proposed in this pull request?
This PR is follow-on of #19518. This PR tries to reduce the number of constant pool entries used for accessing mutable state.
There are two directions:
1. Primitive type variables should be allocated at the outer class due to better performance. Otherwise, this PR allocates an array.
2. The length of allocated array is up to 32768 due to avoiding usage of constant pool entry at access (e.g. `mutableStateArray[32767]`).
Here are some discussions to determine these directions.
1. [[1]](https://github.com/apache/spark/pull/19518#issuecomment-346690464), [[2]](https://github.com/apache/spark/pull/19518#issuecomment-346690642), [[3]](https://github.com/apache/spark/pull/19518#issuecomment-346828180), [[4]](https://github.com/apache/spark/pull/19518#issuecomment-346831544), [[5]](https://github.com/apache/spark/pull/19518#issuecomment-346857340)
2. [[6]](https://github.com/apache/spark/pull/19518#issuecomment-346729172), [[7]](https://github.com/apache/spark/pull/19518#issuecomment-346798358), [[8]](https://github.com/apache/spark/pull/19518#issuecomment-346870408)
This PR modifies `addMutableState` function in the `CodeGenerator` to check if the declared state can be easily initialized compacted into an array. We identify three types of states that cannot compacted:
- Primitive type state (ints, booleans, etc) if the number of them does not exceed threshold
- Multiple-dimensional array type
- `inline = true`
When `useFreshName = false`, the given name is used.
Many codes were ported from #19518. Many efforts were put here. I think this PR should credit to bdrillard
With this PR, the following code is generated:
```
/* 005 */ class SpecificMutableProjection extends org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection {
/* 006 */
/* 007 */ private Object[] references;
/* 008 */ private InternalRow mutableRow;
/* 009 */ private boolean isNull_0;
/* 010 */ private boolean isNull_1;
/* 011 */ private boolean isNull_2;
/* 012 */ private int value_2;
/* 013 */ private boolean isNull_3;
...
/* 10006 */ private int value_4999;
/* 10007 */ private boolean isNull_5000;
/* 10008 */ private int value_5000;
/* 10009 */ private InternalRow[] mutableStateArray = new InternalRow[2];
/* 10010 */ private boolean[] mutableStateArray1 = new boolean[7001];
/* 10011 */ private int[] mutableStateArray2 = new int[1001];
/* 10012 */ private UTF8String[] mutableStateArray3 = new UTF8String[6000];
/* 10013 */
...
/* 107956 */ private void init_176() {
/* 107957 */ isNull_4986 = true;
/* 107958 */ value_4986 = -1;
...
/* 108004 */ }
...
```
## How was this patch tested?
Added a new test case to `GeneratedProjectionSuite`
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Closes#19811 from kiszk/SPARK-18016.
## What changes were proposed in this pull request?
When calling explain on a query, the output can contain sensitive information. We should provide an admin/user to redact such information.
Before this PR, the plan of SS is like this
```
== Physical Plan ==
*HashAggregate(keys=[value#6], functions=[count(1)], output=[value#6, count(1)#12L])
+- StateStoreSave [value#6], state info [ checkpoint = file:/private/var/folders/vx/j0ydl5rn0gd9mgrh1pljnw900000gn/T/temporary-91c6fac0-609f-4bc8-ad57-52c189f06797/state, runId = 05a4b3af-f02c-40f8-9ff9-a3e18bae496f, opId = 0, ver = 0, numPartitions = 5], Complete, 0
+- *HashAggregate(keys=[value#6], functions=[merge_count(1)], output=[value#6, count#18L])
+- StateStoreRestore [value#6], state info [ checkpoint = file:/private/var/folders/vx/j0ydl5rn0gd9mgrh1pljnw900000gn/T/temporary-91c6fac0-609f-4bc8-ad57-52c189f06797/state, runId = 05a4b3af-f02c-40f8-9ff9-a3e18bae496f, opId = 0, ver = 0, numPartitions = 5]
+- *HashAggregate(keys=[value#6], functions=[merge_count(1)], output=[value#6, count#18L])
+- Exchange hashpartitioning(value#6, 5)
+- *HashAggregate(keys=[value#6], functions=[partial_count(1)], output=[value#6, count#18L])
+- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#6]
+- *MapElements <function1>, obj#5: java.lang.String
+- *DeserializeToObject value#30.toString, obj#4: java.lang.String
+- LocalTableScan [value#30]
```
After this PR, we can get the following output if users set `spark.redaction.string.regex` to `file:/[\\w_]+`
```
== Physical Plan ==
*HashAggregate(keys=[value#6], functions=[count(1)], output=[value#6, count(1)#12L])
+- StateStoreSave [value#6], state info [ checkpoint = *********(redacted)/var/folders/vx/j0ydl5rn0gd9mgrh1pljnw900000gn/T/temporary-e7da9b7d-3ec0-474d-8b8c-927f7d12ed72/state, runId = 8a9c3761-93d5-4896-ab82-14c06240dcea, opId = 0, ver = 0, numPartitions = 5], Complete, 0
+- *HashAggregate(keys=[value#6], functions=[merge_count(1)], output=[value#6, count#32L])
+- StateStoreRestore [value#6], state info [ checkpoint = *********(redacted)/var/folders/vx/j0ydl5rn0gd9mgrh1pljnw900000gn/T/temporary-e7da9b7d-3ec0-474d-8b8c-927f7d12ed72/state, runId = 8a9c3761-93d5-4896-ab82-14c06240dcea, opId = 0, ver = 0, numPartitions = 5]
+- *HashAggregate(keys=[value#6], functions=[merge_count(1)], output=[value#6, count#32L])
+- Exchange hashpartitioning(value#6, 5)
+- *HashAggregate(keys=[value#6], functions=[partial_count(1)], output=[value#6, count#32L])
+- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#6]
+- *MapElements <function1>, obj#5: java.lang.String
+- *DeserializeToObject value#27.toString, obj#4: java.lang.String
+- LocalTableScan [value#27]
```
## How was this patch tested?
Added a test case
Author: gatorsmile <gatorsmile@gmail.com>
Closes#19985 from gatorsmile/redactPlan.
## What changes were proposed in this pull request?
The current implementation of InMemoryRelation always uses the most expensive execution plan when writing cache
With CBO enabled, we can actually have a more exact estimation of the underlying table size...
## How was this patch tested?
existing test
Author: CodingCat <zhunansjtu@gmail.com>
Author: Nan Zhu <CodingCat@users.noreply.github.com>
Author: Nan Zhu <nanzhu@uber.com>
Closes#19864 from CodingCat/SPARK-22673.
This change restores the functionality that keeps a limited number of
different types (jobs, stages, etc) depending on configuration, to avoid
the store growing indefinitely over time.
The feature is implemented by creating a new type (ElementTrackingStore)
that wraps a KVStore and allows triggers to be set up for when elements
of a certain type meet a certain threshold. Triggers don't need to
necessarily only delete elements, but the current API is set up in a way
that makes that use case easier.
The new store also has a trigger for the "close" call, which makes it
easier for listeners to register code for cleaning things up and flushing
partial state to the store.
The old configurations for cleaning up the stored elements from the core
and SQL UIs are now active again, and the old unit tests are re-enabled.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#19751 from vanzin/SPARK-20653.
## What changes were proposed in this pull request?
Test Coverage for `PromoteStrings` and `InConversion`, this is a Sub-tasks for [SPARK-22722](https://issues.apache.org/jira/browse/SPARK-22722).
## How was this patch tested?
N/A
Author: Yuming Wang <wgyumg@gmail.com>
Closes#20001 from wangyum/SPARK-22816.
## What changes were proposed in this pull request?
Basic tests for IfCoercion and CaseWhenCoercion
## How was this patch tested?
N/A
Author: Yuming Wang <wgyumg@gmail.com>
Closes#19949 from wangyum/SPARK-22762.
## What changes were proposed in this pull request?
Add a test suite to ensure all the [SSB (Star Schema Benchmark)](https://www.cs.umb.edu/~poneil/StarSchemaB.PDF) queries can be successfully analyzed, optimized and compiled without hitting the max iteration threshold.
## How was this patch tested?
Added `SSBQuerySuite`.
Author: Takeshi Yamamuro <yamamuro@apache.org>
Closes#19990 from maropu/SPARK-22800.
## What changes were proposed in this pull request?
As the discussion in https://github.com/apache/spark/pull/16481 and https://github.com/apache/spark/pull/18975#discussion_r155454606
Currently the BaseRelation returned by `dataSource.writeAndRead` only used in `CreateDataSourceTableAsSelect`, planForWriting and writeAndRead has some common code paths.
In this patch I removed the writeAndRead function and added the getRelation function which only use in `CreateDataSourceTableAsSelectCommand` while saving data to non-existing table.
## How was this patch tested?
Existing UT
Author: Yuanjian Li <xyliyuanjian@gmail.com>
Closes#19941 from xuanyuanking/SPARK-22753.
## What changes were proposed in this pull request?
Add a test suite to ensure all the TPC-H queries can be successfully analyzed, optimized and compiled without hitting the max iteration threshold.
## How was this patch tested?
N/A
Author: gatorsmile <gatorsmile@gmail.com>
Closes#19982 from gatorsmile/testTPCH.
## What changes were proposed in this pull request?
StreamExecution is now an abstract base class, which MicroBatchExecution (the current StreamExecution) inherits. When continuous processing is implemented, we'll have a new ContinuousExecution implementation of StreamExecution.
A few fields are also renamed to make them less microbatch-specific.
## How was this patch tested?
refactoring only
Author: Jose Torres <jose@databricks.com>
Closes#19926 from joseph-torres/continuous-refactor.
## What changes were proposed in this pull request?
In multiple text analysis problems, it is not often desirable for the rows to be split by "\n". There exists a wholeText reader for RDD API, and this JIRA just adds the same support for Dataset API.
## How was this patch tested?
Added relevant new tests for both scala and Java APIs
Author: Prashant Sharma <prashsh1@in.ibm.com>
Author: Prashant Sharma <prashant@apache.org>
Closes#14151 from ScrapCodes/SPARK-16496/wholetext.
## What changes were proposed in this pull request?
This PR adds check whether Java code generated by Catalyst can be compiled by `janino` correctly or not into `TPCDSQuerySuite`. Before this PR, this suite only checks whether analysis can be performed correctly or not.
This check will be able to avoid unexpected performance degrade by interpreter execution due to a Java compilation error.
## How was this patch tested?
Existing a test case, but updated it.
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Closes#19971 from kiszk/SPARK-22774.
## What changes were proposed in this pull request?
`ColumnVector.anyNullsSet` is not called anywhere except tests, and we can easily replace it with `ColumnVector.numNulls > 0`
## How was this patch tested?
existing tests
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19980 from cloud-fan/minor.
## What changes were proposed in this pull request?
These dictionary related APIs are special to `WritableColumnVector` and should not be in `ColumnVector`, which will be public soon.
## How was this patch tested?
existing tests
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19970 from cloud-fan/final.
SQLConf allows some callers to define a custom default value for
configs, and that complicates a little bit the handling of fallback
config entries, since most of the default value resolution is
hidden by the config code.
This change peaks into the internals of these fallback configs
to figure out the correct default value, and also returns the
current human-readable default when showing the default value
(e.g. through "set -v").
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#19974 from vanzin/SPARK-22779.
## What changes were proposed in this pull request?
This PR provides DataSourceV2 API support for structured streaming, including new pieces needed to support continuous processing [SPARK-20928]. High level summary:
- DataSourceV2 includes new mixins to support micro-batch and continuous reads and writes. For reads, we accept an optional user specified schema rather than using the ReadSupportWithSchema model, because doing so would severely complicate the interface.
- DataSourceV2Reader includes new interfaces to read a specific microbatch or read continuously from a given offset. These follow the same setter pattern as the existing Supports* mixins so that they can work with SupportsScanUnsafeRow.
- DataReader (the per-partition reader) has a new subinterface ContinuousDataReader only for continuous processing. This reader has a special method to check progress, and next() blocks for new input rather than returning false.
- Offset, an abstract representation of position in a streaming query, is ported to the public API. (Each type of reader will define its own Offset implementation.)
- DataSourceV2Writer has a new subinterface ContinuousWriter only for continuous processing. Commits to this interface come tagged with an epoch number, as the execution engine will continue to produce new epoch commits as the task continues indefinitely.
Note that this PR does not propose to change the existing DataSourceV2 batch API, or deprecate the existing streaming source/sink internal APIs in spark.sql.execution.streaming.
## How was this patch tested?
Toy implementations of the new interfaces with unit tests.
Author: Jose Torres <jose@databricks.com>
Closes#19925 from joseph-torres/continuous-api.
## What changes were proposed in this pull request?
This pr fixed a compilation error of TPCDS `q75`/`q77` caused by #19813;
```
java.util.concurrent.ExecutionException: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 371, Column 16: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 371, Column 16: Expression "bhj_matched" is not an rvalue
at com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)
at com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293)
at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
at com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135)
```
## How was this patch tested?
Manually checked `q75`/`q77` can be properly compiled
Author: Takeshi Yamamuro <yamamuro@apache.org>
Closes#19969 from maropu/SPARK-22600-FOLLOWUP.
## What changes were proposed in this pull request?
See jira description for the bug : https://issues.apache.org/jira/browse/SPARK-22042
Fix done in this PR is: In `EnsureRequirements`, apply `ReorderJoinPredicates` over the input tree before doing its core logic. Since the tree is transformed bottom-up, we can assure that the children are resolved before doing `ReorderJoinPredicates`.
Theoretically this will guarantee to cover all such cases while keeping the code simple. My small grudge is for cosmetic reasons. This PR will look weird given that we don't call rules from other rules (not to my knowledge). I could have moved all the logic for `ReorderJoinPredicates` into `EnsureRequirements` but that will make it a but crowded. I am happy to discuss if there are better options.
## How was this patch tested?
Added a new test case
Author: Tejas Patil <tejasp@fb.com>
Closes#19257 from tejasapatil/SPARK-22042_ReorderJoinPredicates.
## What changes were proposed in this pull request?
We need to add some helper code to make testing ML transformers & models easier with streaming data. These tests might help us catch any remaining issues and we could encourage future PRs to use these tests to prevent new Models & Transformers from having issues.
I add a `MLTest` trait which extends `StreamTest` trait, and override `createSparkSession`. So ML testsuite can only extend `MLTest`, to use both ML & Stream test util functions.
I only modify one testcase in `LinearRegressionSuite`, for first pass review.
Link to #19746
## How was this patch tested?
`MLTestSuite` added.
Author: WeichenXu <weichen.xu@databricks.com>
Closes#19843 from WeichenXu123/ml_stream_test_helper.
## What changes were proposed in this pull request?
SPARK-22543 fixes the 64kb compile error for deeply nested expression for non-wholestage codegen. This PR extends it to support wholestage codegen.
This patch brings some util methods in to extract necessary parameters for an expression if it is split to a function.
The util methods are put in object `ExpressionCodegen` under `codegen`. The main entry is `getExpressionInputParams` which returns all necessary parameters to evaluate the given expression in a split function.
This util methods can be used to split expressions too. This is a TODO item later.
## How was this patch tested?
Added test.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#19813 from viirya/reduce-expr-code-for-wholestage.
## What changes were proposed in this pull request?
We have two methods to reference an object `addReferenceMinorObj` and `addReferenceObj `. The latter creates a new global variable, which means new entries in the constant pool.
The PR unifies the two method in a single `addReferenceObj` which returns the code to access the object in the `references` array and doesn't add new mutable states.
## How was this patch tested?
added UTs.
Author: Marco Gaido <mgaido@hortonworks.com>
Closes#19916 from mgaido91/SPARK-22716.
In order to enable truncate for PostgreSQL databases in Spark JDBC, a change is needed to the query used for truncating a PostgreSQL table. By default, PostgreSQL will automatically truncate any descendant tables if a TRUNCATE query is executed. As this may result in (unwanted) side-effects, the query used for the truncate should be specified separately for PostgreSQL, specifying only to TRUNCATE a single table.
## What changes were proposed in this pull request?
Add `getTruncateQuery` function to `JdbcDialect.scala`, with default query. Overridden this function for PostgreSQL to only truncate a single table. Also sets `isCascadingTruncateTable` to false, as this will allow truncates for PostgreSQL.
## How was this patch tested?
Existing tests all pass. Added test for `getTruncateQuery`
Author: Daniel van der Ende <daniel.vanderende@gmail.com>
Closes#19911 from danielvdende/SPARK-22717.
## What changes were proposed in this pull request?
In the previous PRs, https://github.com/apache/spark/pull/17832 and https://github.com/apache/spark/pull/17835 , we convert `TIMESTAMP WITH TIME ZONE` and `TIME WITH TIME ZONE` to `TIMESTAMP` for all the JDBC sources. However, this conversion could be risky since it does not respect our SQL configuration `spark.sql.session.timeZone`.
In addition, each vendor might have different semantics for these two types. For example, Postgres simply returns `TIMESTAMP` types for `TIMESTAMP WITH TIME ZONE`. For such supports, we should do it case by case. This PR reverts the general support of `TIMESTAMP WITH TIME ZONE` and `TIME WITH TIME ZONE` for JDBC sources, except ORACLE Dialect.
When supporting the ORACLE's `TIMESTAMP WITH TIME ZONE`, we only support it when the JVM default timezone is the same as the user-specified configuration `spark.sql.session.timeZone` (whose default is the JVM default timezone). Now, we still treat `TIMESTAMP WITH TIME ZONE` as `TIMESTAMP` when fetching the values via the Oracle JDBC connector, whose client converts the timestamp values with time zone to the timestamp values using the local JVM default timezone (a test case is added to `OracleIntegrationSuite.scala` in this PR for showing the behavior). Thus, to avoid any future behavior change, we will not support it if JVM default timezone is different from `spark.sql.session.timeZone`
No regression because the previous two PRs were just merged to be unreleased master branch.
## How was this patch tested?
Added the test cases
Author: gatorsmile <gatorsmile@gmail.com>
Closes#19939 from gatorsmile/timezoneUpdate.
## What changes were proposed in this pull request?
Before we deliver the Hive compatibility mode, we plan to write a set of test cases that can be easily run in both Spark and Hive sides. We can easily compare whether they are the same or not. When new typeCoercion rules are added, we also can easily track the changes. These test cases can also be backported to the previous Spark versions for determining the changes we made.
This PR is the first attempt for improving the test coverage for type coercion compatibility. We generate these test cases for our binary comparison and ImplicitTypeCasts based on the Apache Derby test cases in https://github.com/apache/derby/blob/10.14/java/testing/org/apache/derbyTesting/functionTests/tests/lang/implicitConversions.sql
## How was this patch tested?
N/A
Author: gatorsmile <gatorsmile@gmail.com>
Closes#19918 from gatorsmile/typeCoercionTests.
## What changes were proposed in this pull request?
During https://github.com/apache/spark/pull/19882, `conf` is mistakenly used to switch ORC implementation between `native` and `hive`. To affect `OrcTest` correctly, `spark.conf` should be used.
## How was this patch tested?
Pass the tests.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#19931 from dongjoon-hyun/SPARK-22672-2.
## What changes were proposed in this pull request?
Int96 data written by impala vs data written by hive & spark is stored slightly differently -- they use a different offset for the timezone. This adds an option "spark.sql.parquet.int96TimestampConversion" (false by default) to adjust timestamps if and only if the writer is impala (or more precisely, if the parquet file's "createdBy" metadata does not start with "parquet-mr"). This matches the existing behavior in hive from HIVE-9482.
## How was this patch tested?
Unit test added, existing tests run via jenkins.
Author: Imran Rashid <irashid@cloudera.com>
Author: Henry Robinson <henry@apache.org>
Closes#19769 from squito/SPARK-12297_skip_conversion.
## What changes were proposed in this pull request?
#19416 changed the format in which rows were encoded in the state store. However, this can break existing streaming queries with the old format in unpredictable ways (potentially crashing the JVM). Hence I am reverting this for now. This will be re-applied in the future after we start saving more metadata in checkpoints to signify which version of state row format the existing streaming query is running. Then we can decode old and new formats accordingly.
## How was this patch tested?
Existing tests.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#19924 from tdas/SPARK-22187-1.
## What changes were proposed in this pull request?
This PR support for pushing down filters for DateType in ORC
## How was this patch tested?
Pass the Jenkins with newly add and updated test cases.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#18995 from dongjoon-hyun/SPARK-21787.
…a-2.12 and JDK9
## What changes were proposed in this pull request?
Some compile error after upgrading to scala-2.12
```javascript
spark_source/core/src/main/scala/org/apache/spark/executor/Executor.scala:455: ambiguous reference to overloaded definition, method limit in class ByteBuffer of type (x$1: Int)java.nio.ByteBuffer
method limit in class Buffer of type ()Int
match expected type ?
val resultSize = serializedDirectResult.limit
error
```
The limit method was moved from ByteBuffer to the superclass Buffer and it can no longer be called without (). The same reason for position method.
```javascript
/home/zly/prj/oss/jdk9_HOS_SOURCE/spark_source/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala:427: ambiguous reference to overloaded definition, [error] both method putAll in class Properties of type (x$1: java.util.Map[_, _])Unit [error] and method putAll in class Hashtable of type (x$1: java.util.Map[_ <: Object, _ <: Object])Unit [error] match argument types (java.util.Map[String,String])
[error] props.putAll(outputSerdeProps.toMap.asJava)
[error] ^
```
This is because the key type is Object instead of String which is unsafe.
## How was this patch tested?
running tests
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: kellyzly <kellyzly@126.com>
Closes#19854 from kellyzly/SPARK-22660.
## What changes were proposed in this pull request?
To support vectorization in native OrcFileFormat later, we need to use `buildReaderWithPartitionValues` instead of `buildReader` like ParquetFileFormat. This PR replaces `buildReader` with `buildReaderWithPartitionValues`.
## How was this patch tested?
Pass the Jenkins with the existing test cases.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#19907 from dongjoon-hyun/SPARK-ORC-BUILD-READER.
- Implemented methods getInt, getLong, getBoolean for DataSourceV2Options
- Added new unit tests to exercise these methods
Author: Sunitha Kambhampati <skambha@us.ibm.com>
Closes#19902 from skambha/spark22452.
## What changes were proposed in this pull request?
Similar to https://github.com/apache/spark/pull/19842 , we should also make `ColumnarRow` an immutable view, and move forward to make `ColumnVector` public.
## How was this patch tested?
Existing tests.
The performance concern should be same as https://github.com/apache/spark/pull/19842 .
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19898 from cloud-fan/row-id.
## What changes were proposed in this pull request?
Since SPARK-20682, we have two `OrcFileFormat`s. This PR refactors ORC tests with three principles (with a few exceptions)
1. Move test suite into `sql/core`.
2. Create `HiveXXX` test suite in `sql/hive` by reusing `sql/core` test suite.
3. `OrcTest` will provide common helper functions and `val orcImp: String`.
**Test Suites**
*Native OrcFileFormat*
- org.apache.spark.sql.hive.orc
- OrcFilterSuite
- OrcPartitionDiscoverySuite
- OrcQuerySuite
- OrcSourceSuite
- o.a.s.sql.hive.orc
- OrcHadoopFsRelationSuite
*Hive built-in OrcFileFormat*
- o.a.s.sql.hive.orc
- HiveOrcFilterSuite
- HiveOrcPartitionDiscoverySuite
- HiveOrcQuerySuite
- HiveOrcSourceSuite
- HiveOrcHadoopFsRelationSuite
**Hierarchy**
```
OrcTest
-> OrcSuite
-> OrcSourceSuite
-> OrcQueryTest
-> OrcQuerySuite
-> OrcPartitionDiscoveryTest
-> OrcPartitionDiscoverySuite
-> OrcFilterSuite
HadoopFsRelationTest
-> OrcHadoopFsRelationSuite
-> HiveOrcHadoopFsRelationSuite
```
Please note the followings.
- Unlike the other test suites, `OrcHadoopFsRelationSuite` doesn't inherit `OrcTest`. It is inside `sql/hive` like `ParquetHadoopFsRelationSuite` due to the dependencies and follows the existing convention to use `val dataSourceName: String`
- `OrcFilterSuite`s cannot reuse test cases due to the different function signatures using Hive 1.2.1 ORC classes and Apache ORC 1.4.1 classes.
## How was this patch tested?
Pass the Jenkins tests with reorganized test suites.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#19882 from dongjoon-hyun/SPARK-22672.
## What changes were proposed in this pull request?
There was a bug in Univocity Parser that causes the issue in SPARK-22516. This was fixed by upgrading from 2.5.4 to 2.5.9 version of the library :
**Executing**
```
spark.read.option("header","true").option("inferSchema", "true").option("multiLine", "true").option("comment", "g").csv("test_file_without_eof_char.csv").show()
```
**Before**
```
ERROR Executor: Exception in task 0.0 in stage 6.0 (TID 6)
com.univocity.parsers.common.TextParsingException: java.lang.IllegalArgumentException - Unable to skip 1 lines from line 2. End of input reached
...
Internal state when error was thrown: line=3, column=0, record=2, charIndex=31
at com.univocity.parsers.common.AbstractParser.handleException(AbstractParser.java:339)
at com.univocity.parsers.common.AbstractParser.parseNext(AbstractParser.java:475)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anon$1.next(UnivocityParser.scala:281)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
```
**After**
```
+-------+-------+
|column1|column2|
+-------+-------+
| abc| def|
+-------+-------+
```
## How was this patch tested?
The already existing `CSVSuite.commented lines in CSV data` test was extended to parse the file also in multiline mode. The test input file was modified to also include a comment in the last line.
Author: smurakozi <smurakozi@gmail.com>
Closes#19906 from smurakozi/SPARK-22516.
## What changes were proposed in this pull request?
This is a follow-up of https://github.com/apache/spark/pull/19871 to improve an exception message.
## How was this patch tested?
Pass the Jenkins.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#19903 from dongjoon-hyun/orc_exception.
## What changes were proposed in this pull request?
The SQL `Analyzer` goes through a whole query plan even most part of it is analyzed. This increases the time spent on query analysis for long pipelines in ML, especially.
This patch adds a logical node called `AnalysisBarrier` that wraps an analyzed logical plan to prevent it from analysis again. The barrier is applied to the analyzed logical plan in `Dataset`. It won't change the output of wrapped logical plan and just acts as a wrapper to hide it from analyzer. New operations on the dataset will be put on the barrier, so only the new nodes created will be analyzed.
This analysis barrier will be removed at the end of analysis stage.
## How was this patch tested?
Added tests.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#19873 from viirya/SPARK-20392-reopen.
## What changes were proposed in this pull request?
During [SPARK-22488](https://github.com/apache/spark/pull/19713) to fix view resolution issue, there occurs a regression at `2.2.1` and `master` branch like the following. This PR fixes that.
```scala
scala> spark.version
res2: String = 2.2.1
scala> sql("DROP TABLE IF EXISTS t").show
17/12/04 21:01:06 WARN DropTableCommand: org.apache.spark.sql.AnalysisException:
Table or view not found: t;
org.apache.spark.sql.AnalysisException: Table or view not found: t;
```
## How was this patch tested?
Manual.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#19888 from dongjoon-hyun/SPARK-22686.
## What changes were proposed in this pull request?
This PR aims to provide a configuration to choose the default `OrcFileFormat` from legacy `sql/hive` module or new `sql/core` module.
For example, this configuration will affects the following operations.
```scala
spark.read.orc(...)
```
```sql
CREATE TABLE t
USING ORC
...
```
## How was this patch tested?
Pass the Jenkins with new test suites.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#19871 from dongjoon-hyun/spark-sql-orc-enabled.
## What changes were proposed in this pull request?
PropagateTypes are called twice in TypeCoercion. We do not need to call it twice. Instead, we should call it after each change on the types.
## How was this patch tested?
The existing tests
Author: gatorsmile <gatorsmile@gmail.com>
Closes#19874 from gatorsmile/deduplicatePropagateTypes.
## What changes were proposed in this pull request?
The `HashAggregateExec` whole stage codegen path is a little messy and hard to understand, this code cleans it up a little bit, especially for the fast hash map part.
## How was this patch tested?
existing tests
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19869 from cloud-fan/hash-agg.
## What changes were proposed in this pull request?
Since [SPARK-2883](https://issues.apache.org/jira/browse/SPARK-2883), Apache Spark supports Apache ORC inside `sql/hive` module with Hive dependency. This PR aims to add a new ORC data source inside `sql/core` and to replace the old ORC data source eventually. This PR resolves the following three issues.
- [SPARK-20682](https://issues.apache.org/jira/browse/SPARK-20682): Add new ORCFileFormat based on Apache ORC 1.4.1
- [SPARK-15474](https://issues.apache.org/jira/browse/SPARK-15474): ORC data source fails to write and read back empty dataframe
- [SPARK-21791](https://issues.apache.org/jira/browse/SPARK-21791): ORC should support column names with dot
## How was this patch tested?
Pass the Jenkins with the existing all tests and new tests for SPARK-15474 and SPARK-21791.
Author: Dongjoon Hyun <dongjoon@apache.org>
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19651 from dongjoon-hyun/SPARK-20682.
## What changes were proposed in this pull request?
Use a separate Spark event queue for StreamingQueryListenerBus so that if there are many non-streaming events, streaming query listeners don't need to wait for other Spark listeners and can catch up.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <zsxwing@gmail.com>
Closes#19838 from zsxwing/SPARK-22638.
## What changes were proposed in this pull request?
When user tries to load data with a non existing hdfs file path system is not validating it and the load command operation is getting successful.
This is misleading to the user. already there is a validation in the scenario of none existing local file path. This PR has added validation in the scenario of nonexisting hdfs file path
## How was this patch tested?
UT has been added for verifying the issue, also snapshots has been added after the verification in a spark yarn cluster
Author: sujith71955 <sujithchacko.2010@gmail.com>
Closes#19823 from sujith71955/master_LoadComand_Issue.
## What changes were proposed in this pull request?
This PR introduces a way to explicitly range-partition a Dataset. So far, only round-robin and hash partitioning were possible via `df.repartition(...)`, but sometimes range partitioning might be desirable: e.g. when writing to disk, for better compression without the cost of global sort.
The current implementation piggybacks on the existing `RepartitionByExpression` `LogicalPlan` and simply adds the following logic: If its expressions are of type `SortOrder`, then it will do `RangePartitioning`; otherwise `HashPartitioning`. This was by far the least intrusive solution I could come up with.
## How was this patch tested?
Unit test for `RepartitionByExpression` changes, a test to ensure we're not changing the behavior of existing `.repartition()` and a few end-to-end tests in `DataFrameSuite`.
Author: Adrian Ionescu <adrian@databricks.com>
Closes#19828 from adrian-ionescu/repartitionByRange.
## What changes were proposed in this pull request?
How to reproduce:
```scala
import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value").createTempView("table1")
spark.createDataFrame(Seq((1, "1"), (2, "2"))).toDF("key", "value").createTempView("table2")
val bl = sql("SELECT /*+ MAPJOIN(t1) */ * FROM table1 t1 JOIN table2 t2 ON t1.key = t2.key").queryExecution.executedPlan
println(bl.children.head.asInstanceOf[BroadcastHashJoinExec].buildSide)
```
The result is `BuildRight`, but should be `BuildLeft`. This PR fix this issue.
## How was this patch tested?
unit tests
Author: Yuming Wang <wgyumg@gmail.com>
Closes#19714 from wangyum/SPARK-22489.
## What changes were proposed in this pull request?
To make `ColumnVector` public, `ColumnarArray` need to be public too, and we should not have mutable public fields in a public class. This PR proposes to make `ColumnarArray` an immutable view of the data, and always create a new instance of `ColumnarArray` in `ColumnVector#getArray`
## How was this patch tested?
new benchmark in `ColumnarBatchBenchmark`
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19842 from cloud-fan/column-vector.
## What changes were proposed in this pull request?
As a step to make `ColumnVector` public, the `ColumnarRow` returned by `ColumnVector#getStruct` should be immutable.
However we do need the mutability of `ColumnaRow` for the fast vectorized hashmap in hash aggregate. To solve this, this PR introduces a `MutableColumnarRow` for this use case.
## How was this patch tested?
existing test.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19847 from cloud-fan/mutable-row.
## What changes were proposed in this pull request?
Currently, in the optimize rule `PropagateEmptyRelation`, the following cases is not handled:
1. empty relation as right child in left outer join
2. empty relation as left child in right outer join
3. empty relation as right child in left semi join
4. empty relation as right child in left anti join
5. only one empty relation in full outer join
case 1 / 2 / 5 can be treated as **Cartesian product** and cause exception. See the new test cases.
## How was this patch tested?
Unit test
Author: Wang Gengliang <ltnwgl@gmail.com>
Closes#19825 from gengliangwang/SPARK-22615.
## What changes were proposed in this pull request?
For SQL write jobs, we only set metrics for the SQL listener and display them in the SQL plan UI. We should also set metrics for Spark task output metrics, which will be shown in spark job UI.
## How was this patch tested?
test it manually. For a simple write job
```
spark.range(1000).write.parquet("/tmp/p1")
```
now the spark job UI looks like
![ui](https://user-images.githubusercontent.com/3182036/33326478-05a25b7c-d490-11e7-96ef-806117774356.jpg)
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19833 from cloud-fan/ui.
## What changes were proposed in this pull request?
`CatalogImpl.refreshTable` uses `foreach(..)` to refresh all tables in a view. This traverses all nodes in the subtree and calls `LogicalPlan.refresh()` on these nodes. However `LogicalPlan.refresh()` is also refreshing its children, as a result refreshing a large view can be quite expensive.
This PR just calls `LogicalPlan.refresh()` on the top node.
## How was this patch tested?
Existing tests.
Author: Herman van Hovell <hvanhovell@databricks.com>
Closes#19837 from hvanhovell/SPARK-22637.
## What changes were proposed in this pull request?
* JIRA: [SPARK-22431](https://issues.apache.org/jira/browse/SPARK-22431) : Creating Permanent view with illegal type
**Description:**
- It is possible in Spark SQL to create a permanent view that uses an nested field with an illegal name.
- For example if we create the following view:
```create view x as select struct('a' as `$q`, 1 as b) q```
- A simple select fails with the following exception:
```
select * from x;
org.apache.spark.SparkException: Cannot recognize hive type string: struct<$q:string,b:int>
at org.apache.spark.sql.hive.client.HiveClientImpl$.fromHiveColumn(HiveClientImpl.scala:812)
at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1$$anonfun$apply$11$$anonfun$7.apply(HiveClientImpl.scala:378)
at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1$$anonfun$apply$11$$anonfun$7.apply(HiveClientImpl.scala:378)
...
```
**Issue/Analysis**: Right now, we can create a view with a schema that cannot be read back by Spark from the Hive metastore. For more details, please see the discussion about the analysis and proposed fix options in comment 1 and comment 2 in the [SPARK-22431](https://issues.apache.org/jira/browse/SPARK-22431)
**Proposed changes**:
- Fix the hive table/view codepath to check whether the schema datatype is parseable by Spark before persisting it in the metastore. This change is localized to HiveClientImpl to do the check similar to the check in FromHiveColumn. This is fail-fast and we will avoid the scenario where we write something to the metastore that we are unable to read it back.
- Added new unit tests
- Ran the sql related unit test suites ( hive/test, sql/test, catalyst/test) OK
With the fix:
```
create view x as select struct('a' as `$q`, 1 as b) q;
17/11/28 10:44:55 ERROR SparkSQLDriver: Failed in [create view x as select struct('a' as `$q`, 1 as b) q]
org.apache.spark.SparkException: Cannot recognize hive type string: struct<$q:string,b:int>
at org.apache.spark.sql.hive.client.HiveClientImpl$.org$apache$spark$sql$hive$client$HiveClientImpl$$getSparkSQLDataType(HiveClientImpl.scala:884)
at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$org$apache$spark$sql$hive$client$HiveClientImpl$$verifyColumnDataType$1.apply(HiveClientImpl.scala:906)
at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$org$apache$spark$sql$hive$client$HiveClientImpl$$verifyColumnDataType$1.apply(HiveClientImpl.scala:906)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
...
```
## How was this patch tested?
- New unit tests have been added.
hvanhovell, Please review and share your thoughts/comments. Thank you so much.
Author: Sunitha Kambhampati <skambha@us.ibm.com>
Closes#19747 from skambha/spark22431.
## What changes were proposed in this pull request?
Currently, relation size is computed as the sum of file size, which is error-prone because storage format like parquet may have a much smaller file size compared to in-memory size. When we choose broadcast join based on file size, there's a risk of OOM. But if the number of rows is available in statistics, we can get a better estimation by `numRows * rowSize`, which helps to alleviate this problem.
## How was this patch tested?
Added a new test case for data source table and hive table.
Author: Zhenhua Wang <wzh_zju@163.com>
Author: Zhenhua Wang <wangzhenhua@huawei.com>
Closes#19743 from wzhfy/better_leaf_size.
## What changes were proposed in this pull request?
When converting Pandas DataFrame/Series from/to Spark DataFrame using `toPandas()` or pandas udfs, timestamp values behave to respect Python system timezone instead of session timezone.
For example, let's say we use `"America/Los_Angeles"` as session timezone and have a timestamp value `"1970-01-01 00:00:01"` in the timezone. Btw, I'm in Japan so Python timezone would be `"Asia/Tokyo"`.
The timestamp value from current `toPandas()` will be the following:
```
>>> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles")
>>> df = spark.createDataFrame([28801], "long").selectExpr("timestamp(value) as ts")
>>> df.show()
+-------------------+
| ts|
+-------------------+
|1970-01-01 00:00:01|
+-------------------+
>>> df.toPandas()
ts
0 1970-01-01 17:00:01
```
As you can see, the value becomes `"1970-01-01 17:00:01"` because it respects Python timezone.
As we discussed in #18664, we consider this behavior is a bug and the value should be `"1970-01-01 00:00:01"`.
## How was this patch tested?
Added tests and existing tests.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#19607 from ueshin/issues/SPARK-22395.
## What changes were proposed in this pull request?
In PySpark API Document, DataFrame.write.csv() says that setting the quote parameter to an empty string should turn off quoting. Instead, it uses the [null character](https://en.wikipedia.org/wiki/Null_character) as the quote.
This PR fixes the doc.
## How was this patch tested?
Manual.
```
cd python/docs
make html
open _build/html/pyspark.sql.html
```
Author: gaborgsomogyi <gabor.g.somogyi@gmail.com>
Closes#19814 from gaborgsomogyi/SPARK-22484.
## What changes were proposed in this pull request?
Code generation is disabled for CaseWhen when the number of branches is higher than `spark.sql.codegen.maxCaseBranches` (which defaults to 20). This was done to prevent the well known 64KB method limit exception.
This PR proposes to support code generation also in those cases (without causing exceptions of course). As a side effect, we could get rid of the `spark.sql.codegen.maxCaseBranches` configuration.
## How was this patch tested?
existing UTs
Author: Marco Gaido <mgaido@hortonworks.com>
Author: Marco Gaido <marcogaido91@gmail.com>
Closes#19752 from mgaido91/SPARK-22520.
## What changes were proposed in this pull request?
Currently, relation stats is the same whether cbo is enabled or not. While relation (`LogicalRelation` or `HiveTableRelation`) is a `LogicalPlan`, its behavior is inconsistent with other plans. This can cause confusion when user runs EXPLAIN COST commands. Besides, when CBO is disabled, we apply the size-only estimation strategy, so there's no need to propagate other catalog statistics to relation.
## How was this patch tested?
Enhanced existing tests case and added a test case.
Author: Zhenhua Wang <wangzhenhua@huawei.com>
Closes#19757 from wzhfy/catalog_stats_conversion.
## What changes were proposed in this pull request?
`ColumnVector#loadBytes` is only used as an optimization for reading UTF8String in `WritableColumnVector`, this PR moves this optimization to `WritableColumnVector` and simplified it.
## How was this patch tested?
existing test
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19815 from cloud-fan/load-bytes.
## What changes were proposed in this pull request?
`nullsNativeAddress` and `valuesNativeAddress` are only used in tests and benchmark, no need to be top class API.
## How was this patch tested?
existing tests
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19818 from cloud-fan/minor.
## What changes were proposed in this pull request?
`ctx.currentVars` means the input variables for the current operator, which is already decided in `CodegenSupport`, we can set it there instead of `doConsume`.
also add more comments to help people understand the codegen framework.
After this PR, we now have a principle about setting `ctx.currentVars` and `ctx.INPUT_ROW`:
1. for non-whole-stage-codegen path, never set them. (permit some special cases like generating ordering)
2. for whole-stage-codegen `produce` path, mostly we don't need to set them, but blocking operators may need to set them for expressions that produce data from data source, sort buffer, aggregate buffer, etc.
3. for whole-stage-codegen `consume` path, mostly we don't need to set them because `currentVars` is automatically set to child input variables and `INPUT_ROW` is mostly not used. A few plans need to tweak them as they may have different inputs, or they use the input row.
## How was this patch tested?
existing tests.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19803 from cloud-fan/codegen.
## What changes were proposed in this pull request?
A frequently reported issue of Spark is the Java 64kb compile error. This is because Spark generates a very big method and it's usually caused by 3 reasons:
1. a deep expression tree, e.g. a very complex filter condition
2. many individual expressions, e.g. expressions can have many children, operators can have many expressions.
3. a deep query plan tree (with whole stage codegen)
This PR focuses on 1. There are already several patches(#15620#18972#18641) trying to fix this issue and some of them are already merged. However this is an endless job as every non-leaf expression has this issue.
This PR proposes to fix this issue in `Expression.genCode`, to make sure the code for a single expression won't grow too big.
According to maropu 's benchmark, no regression is found with TPCDS (thanks maropu !): https://docs.google.com/spreadsheets/d/1K3_7lX05-ZgxDXi9X_GleNnDjcnJIfoSlSCDZcL4gdg/edit?usp=sharing
## How was this patch tested?
existing test
Author: Wenchen Fan <wenchen@databricks.com>
Author: Wenchen Fan <cloud0fan@gmail.com>
Closes#19767 from cloud-fan/codegen.
## What changes were proposed in this pull request?
Let’s say I have a nested AND expression shown below and p2 can not be pushed down,
(p1 AND p2) OR p3
In current Spark code, during data source filter translation, (p1 AND p2) is returned as p1 only and p2 is simply lost. This issue occurs with JDBC data source and is similar to [SPARK-12218](https://github.com/apache/spark/pull/10362) for Parquet. When we have AND nested below another expression, we should either push both legs or nothing.
Note that:
- The current Spark code will always split conjunctive predicate before it determines if a predicate can be pushed down or not
- If I have (p1 AND p2) AND p3, it will be split into p1, p2, p3. There won't be nested AND expression.
- The current Spark code logic for OR is OK. It either pushes both legs or nothing.
The same translation method is also called by Data Source V2.
## How was this patch tested?
Added new unit test cases to JDBCSuite
gatorsmile
Author: Jia Li <jiali@us.ibm.com>
Closes#19776 from jliwork/spark-22548.
## What changes were proposed in this pull request?
Added the histogram representation to the output of the `DESCRIBE EXTENDED table_name column_name` command.
## How was this patch tested?
Modified SQL UT and checked output
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Marco Gaido <mgaido@hortonworks.com>
Closes#19774 from mgaido91/SPARK-22475.
## What changes were proposed in this pull request?
This PR is to clean the usage of addMutableState and splitExpressions
1. replace hardcoded type string to ctx.JAVA_BOOLEAN etc.
2. create a default value of the initCode for ctx.addMutableStats
3. Use named arguments when calling `splitExpressions `
## How was this patch tested?
The existing test cases
Author: gatorsmile <gatorsmile@gmail.com>
Closes#19790 from gatorsmile/codeClean.
This PR enables to use ``OffHeapColumnVector`` when ``spark.sql.columnVector.offheap.enable`` is set to ``true``. While ``ColumnVector`` has two implementations ``OnHeapColumnVector`` and ``OffHeapColumnVector``, only ``OnHeapColumnVector`` is always used.
This PR implements the followings
- Pass ``OffHeapColumnVector`` to ``ColumnarBatch.allocate()`` when ``spark.sql.columnVector.offheap.enable`` is set to ``true``
- Free all of off-heap memory regions by ``OffHeapColumnVector.close()``
- Ensure to call ``OffHeapColumnVector.close()``
Use existing tests
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Closes#17436 from kiszk/SPARK-20101.
## What changes were proposed in this pull request?
ScalaTest 3.0 uses an implicit `Signaler`. This PR makes it sure all Spark tests uses `ThreadSignaler` explicitly which has the same default behavior of interrupting a thread on the JVM like ScalaTest 2.2.x. This will reduce potential flakiness.
## How was this patch tested?
This is testsuite-only update. This should passes the Jenkins tests.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#19784 from dongjoon-hyun/use_thread_signaler.
## What changes were proposed in this pull request?
Pass the FileSystem created using the correct Hadoop conf into `globPathIfNecessary` so that it can pick up user's hadoop configurations, such as credentials.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <zsxwing@gmail.com>
Closes#19771 from zsxwing/fix-file-stream-conf.
## What changes were proposed in this pull request?
* Add a "function type" argument to pandas_udf.
* Add a new public enum class `PandasUdfType` in pyspark.sql.functions
* Refactor udf related code from pyspark.sql.functions to pyspark.sql.udf
* Merge "PythonUdfType" and "PythonEvalType" into a single enum class "PythonEvalType"
Example:
```
from pyspark.sql.functions import pandas_udf, PandasUDFType
pandas_udf('double', PandasUDFType.SCALAR):
def plus_one(v):
return v + 1
```
## Design doc
https://docs.google.com/document/d/1KlLaa-xJ3oz28xlEJqXyCAHU3dwFYkFs_ixcUXrJNTc/edit
## How was this patch tested?
Added PandasUDFTests
## TODO:
* [x] Implement proper enum type for `PandasUDFType`
* [x] Update documentation
* [x] Add more tests in PandasUDFTests
Author: Li Jin <ice.xelloss@gmail.com>
Closes#19630 from icexelloss/spark-22409-pandas-udf-type.
## What changes were proposed in this pull request?
`ColumnarBatch` provides features to do fast filter and project in a columnar fashion, however this feature is never used by Spark, as Spark uses whole stage codegen and processes the data in a row fashion. This PR proposes to remove these unused features as we won't switch to columnar execution in the near future. Even we do, I think this part needs a proper redesign.
This is also a step to make `ColumnVector` public, as we don't wanna expose these features to users.
## How was this patch tested?
existing tests
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19766 from cloud-fan/vector.
## What changes were proposed in this pull request?
Do not include jdbc properties which may contain credentials in logging a logical plan with `SaveIntoDataSourceCommand` in it.
## How was this patch tested?
building locally and trying to reproduce (per the steps in https://issues.apache.org/jira/browse/SPARK-22479):
```
== Parsed Logical Plan ==
SaveIntoDataSourceCommand org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider570127fa, Map(dbtable -> test20, driver -> org.postgresql.Driver, url -> *********(redacted), password -> *********(redacted)), ErrorIfExists
+- Range (0, 100, step=1, splits=Some(8))
== Analyzed Logical Plan ==
SaveIntoDataSourceCommand org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider570127fa, Map(dbtable -> test20, driver -> org.postgresql.Driver, url -> *********(redacted), password -> *********(redacted)), ErrorIfExists
+- Range (0, 100, step=1, splits=Some(8))
== Optimized Logical Plan ==
SaveIntoDataSourceCommand org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider570127fa, Map(dbtable -> test20, driver -> org.postgresql.Driver, url -> *********(redacted), password -> *********(redacted)), ErrorIfExists
+- Range (0, 100, step=1, splits=Some(8))
== Physical Plan ==
Execute SaveIntoDataSourceCommand
+- SaveIntoDataSourceCommand org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider570127fa, Map(dbtable -> test20, driver -> org.postgresql.Driver, url -> *********(redacted), password -> *********(redacted)), ErrorIfExists
+- Range (0, 100, step=1, splits=Some(8))
```
Author: osatici <osatici@palantir.com>
Closes#19708 from onursatici/os/redact-jdbc-creds.
## What changes were proposed in this pull request?
This fixes a problem caused by #15880
`select '1.5' > 0.5; // Result is NULL in Spark but is true in Hive.
`
When compare string and numeric, cast them as double like Hive.
Author: liutang123 <liutang123@yeah.net>
Closes#19692 from liutang123/SPARK-22469.
## What changes were proposed in this pull request?
Logically the `Array` doesn't belong to `ColumnVector`, and `Row` doesn't belong to `ColumnarBatch`. e.g. `ColumnVector` needs to return `Array` for `getArray`, and `Row` for `getStruct`. `Array` and `Row` can return each other with the `getArray`/`getStruct` methods.
This is also a step to make `ColumnVector` public, it's cleaner to have `Array` and `Row` as top-level classes.
This PR is just code moving around, with 2 renaming: `Array` -> `VectorBasedArray`, `Row` -> `VectorBasedRow`.
## How was this patch tested?
existing tests.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19740 from cloud-fan/vector.
This change replaces the SQLListener with a new implementation that
saves the data to the same store used by the SparkContext's status
store. For that, the types used by the old SQLListener had to be
updated a bit so that they're more serialization-friendly.
The interface for getting data from the store was abstracted into
a new class, SQLAppStatusStore (following the convention used in
core).
Another change is the way that the SQL UI hooks up into the core
UI or the SHS. The old "SparkHistoryListenerFactory" was replaced
with a new "AppStatePlugin" that more explicitly differentiates
between the two use cases: processing events, and showing the UI.
Both live apps and the SHS use this new API (previously, it was
restricted to the SHS).
Note on the above: this causes a slight change of behavior for
live apps; the SQL tab will only show up after the first execution
is started.
The metrics gathering code was re-worked a bit so that the types
used are less memory hungry and more serialization-friendly. This
reduces memory usage when using in-memory stores, and reduces load
times when using disk stores.
Tested with existing and added unit tests. Note one unit test was
disabled because it depends on SPARK-20653, which isn't in yet.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#19681 from vanzin/SPARK-20652.
## What changes were proposed in this pull request?
Equi-height histogram is effective in cardinality estimation, and more accurate than basic column stats (min, max, ndv, etc) especially in skew distribution. So we need to support it.
For equi-height histogram, all buckets (intervals) have the same height (frequency).
In this PR, we use a two-step method to generate an equi-height histogram:
1. use `ApproximatePercentile` to get percentiles `p(0), p(1/n), p(2/n) ... p((n-1)/n), p(1)`;
2. construct range values of buckets, e.g. `[p(0), p(1/n)], [p(1/n), p(2/n)] ... [p((n-1)/n), p(1)]`, and use `ApproxCountDistinctForIntervals` to count ndv in each bucket. Each bucket is of the form: `(lowerBound, higherBound, ndv)`.
## How was this patch tested?
Added new test cases and modified some existing test cases.
Author: Zhenhua Wang <wangzhenhua@huawei.com>
Author: Zhenhua Wang <wzh_zju@163.com>
Closes#19479 from wzhfy/generate_histogram.
## What changes were proposed in this pull request?
There is a concern that Spark-side codegen row-by-row filtering might be faster than Parquet's one in general due to type-boxing and additional fuction calls which Spark's one tries to avoid.
So, this PR adds an option to disable/enable record-by-record filtering in Parquet side.
It sets the default to `false` to take the advantage of the improvement.
This was also discussed in https://github.com/apache/spark/pull/14671.
## How was this patch tested?
Manually benchmarks were performed. I generated a billion (1,000,000,000) records and tested equality comparison concatenated with `OR`. This filter combinations were made from 5 to 30.
It seem indeed Spark-filtering is faster in the test case and the gap increased as the filter tree becomes larger.
The details are as below:
**Code**
``` scala
test("Parquet-side filter vs Spark-side filter - record by record") {
withTempPath { path =>
val N = 1000 * 1000 * 1000
val df = spark.range(N).toDF("a")
df.write.parquet(path.getAbsolutePath)
val benchmark = new Benchmark("Parquet-side vs Spark-side", N)
Seq(5, 10, 20, 30).foreach { num =>
val filterExpr = (0 to num).map(i => s"a = $i").mkString(" OR ")
benchmark.addCase(s"Parquet-side filter - number of filters [$num]", 3) { _ =>
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> false.toString,
SQLConf.PARQUET_RECORD_FILTER_ENABLED.key -> true.toString) {
// We should strip Spark-side filter to compare correctly.
stripSparkFilter(
spark.read.parquet(path.getAbsolutePath).filter(filterExpr)).count()
}
}
benchmark.addCase(s"Spark-side filter - number of filters [$num]", 3) { _ =>
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> false.toString,
SQLConf.PARQUET_RECORD_FILTER_ENABLED.key -> false.toString) {
spark.read.parquet(path.getAbsolutePath).filter(filterExpr).count()
}
}
}
benchmark.run()
}
}
```
**Result**
```
Parquet-side vs Spark-side: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Parquet-side filter - number of filters [5] 4268 / 4367 234.3 4.3 0.8X
Spark-side filter - number of filters [5] 3709 / 3741 269.6 3.7 0.9X
Parquet-side filter - number of filters [10] 5673 / 5727 176.3 5.7 0.6X
Spark-side filter - number of filters [10] 3588 / 3632 278.7 3.6 0.9X
Parquet-side filter - number of filters [20] 8024 / 8440 124.6 8.0 0.4X
Spark-side filter - number of filters [20] 3912 / 3946 255.6 3.9 0.8X
Parquet-side filter - number of filters [30] 11936 / 12041 83.8 11.9 0.3X
Spark-side filter - number of filters [30] 3929 / 3978 254.5 3.9 0.8X
```
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#15049 from HyukjinKwon/SPARK-17310.
## What changes were proposed in this pull request?
This change uses Arrow to optimize the creation of a Spark DataFrame from a Pandas DataFrame. The input df is sliced according to the default parallelism. The optimization is enabled with the existing conf "spark.sql.execution.arrow.enabled" and is disabled by default.
## How was this patch tested?
Added new unit test to create DataFrame with and without the optimization enabled, then compare results.
Author: Bryan Cutler <cutlerb@gmail.com>
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#19459 from BryanCutler/arrow-createDataFrame-from_pandas-SPARK-20791.
## What changes were proposed in this pull request?
This PR changes `AND` or `OR` code generation to place condition and then expressions' generated code into separated methods if these size could be large. When the method is newly generated, variables for `isNull` and `value` are declared as an instance variable to pass these values (e.g. `isNull1409` and `value1409`) to the callers of the generated method.
This PR resolved two cases:
* large code size of left expression
* large code size of right expression
## How was this patch tested?
Added a new test case into `CodeGenerationSuite`
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Closes#18972 from kiszk/SPARK-21720.
## What changes were proposed in this pull request?
This PR makes Spark to be able to read Parquet TIMESTAMP_MICROS values, and add a new config to allow Spark to write timestamp values to parquet as TIMESTAMP_MICROS type.
## How was this patch tested?
new test
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19702 from cloud-fan/parquet.
## What changes were proposed in this pull request?
The current internal `table()` API of `SparkSession` bypasses the Analyzer and directly calls `sessionState.catalog.lookupRelation` API. This skips the view resolution logics in our Analyzer rule `ResolveRelations`. This internal API is widely used by various DDL commands, public and internal APIs.
Users might get the strange error caused by view resolution when the default database is different.
```
Table or view not found: t1; line 1 pos 14
org.apache.spark.sql.AnalysisException: Table or view not found: t1; line 1 pos 14
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
```
This PR is to fix it by enforcing it to use `ResolveRelations` to resolve the table.
## How was this patch tested?
Added a test case and modified the existing test cases
Author: gatorsmile <gatorsmile@gmail.com>
Closes#19713 from gatorsmile/viewResolution.
## What changes were proposed in this pull request?
For the few Dataset actions such as `foreach`, currently no SQL metrics are visible in the SQL tab of SparkUI. It is because it binds wrongly to Dataset's `QueryExecution`. As the actions directly evaluate on the RDD which has individual `QueryExecution`, to show correct SQL metrics on UI, we should bind to RDD's `QueryExecution`.
## How was this patch tested?
Manually test. Screenshot is attached in the PR.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#19689 from viirya/SPARK-22462.
## What changes were proposed in this pull request?
Fix to allow recovery on console , avoid checkpoint exception
## How was this patch tested?
existing tests
manual tests [ Replicating error and seeing no checkpoint error after fix]
Author: Rekha Joshi <rekhajoshm@gmail.com>
Author: rjoshi2 <rekhajoshm@gmail.com>
Closes#19407 from rekhajoshm/SPARK-21667.
## What changes were proposed in this pull request?
In `spark-sql` module tests there are deprecations warnings caused by the usage of deprecated methods of `java.sql.Date` and the usage of the deprecated `AsyncAssertions.Waiter` class.
This PR replace the deprecated methods of `java.sql.Date` with non-deprecated ones (using `Calendar` where needed). It replaces also the deprecated `org.scalatest.concurrent.AsyncAssertions.Waiter` with `org.scalatest.concurrent.Waiters._`.
## How was this patch tested?
existing UTs
Author: Marco Gaido <mgaido@hortonworks.com>
Closes#19696 from mgaido91/SPARK-22473.
## What changes were proposed in this pull request?
One powerful feature of `Dataset` is, we can easily map SQL rows to Scala/Java objects and do runtime null check automatically.
For example, let's say we have a parquet file with schema `<a: int, b: string>`, and we have a `case class Data(a: Int, b: String)`. Users can easily read this parquet file into `Data` objects, and Spark will throw NPE if column `a` has null values.
However the null checking is left behind for top-level primitive values. For example, let's say we have a parquet file with schema `<a: Int>`, and we read it into Scala `Int`. If column `a` has null values, we will get some weird results.
```
scala> val ds = spark.read.parquet(...).as[Int]
scala> ds.show()
+----+
|v |
+----+
|null|
|1 |
+----+
scala> ds.collect
res0: Array[Long] = Array(0, 1)
scala> ds.map(_ * 2).show
+-----+
|value|
+-----+
|-2 |
|2 |
+-----+
```
This is because internally Spark use some special default values for primitive types, but never expect users to see/operate these default value directly.
This PR adds null check for top-level primitive values
## How was this patch tested?
new test
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19707 from cloud-fan/bug.
Continuation of PR#19528 (https://github.com/apache/spark/pull/19529#issuecomment-340252119)
The problem with the maven build in the previous PR was the new tests.... the creation of a spark session outside the tests meant there was more than one spark session around at a time.
I was using the spark session outside the tests so that the tests could share data; I've changed it so that each test creates the data anew.
Author: Nathan Kronenfeld <nicole.oresme@gmail.com>
Author: Nathan Kronenfeld <nkronenfeld@uncharted.software>
Closes#19705 from nkronenfeld/alternative-style-tests-2.
## What changes were proposed in this pull request?
Current ML's Bucketizer can only bin a column of continuous features. If a dataset has thousands of of continuous columns needed to bin, we will result in thousands of ML stages. It is inefficient regarding query planning and execution.
We should have a type of bucketizer that can bin a lot of columns all at once. It would need to accept an list of arrays of split points to correspond to the columns to bin, but it might make things more efficient by replacing thousands of stages with just one.
This current approach in this patch is to add a new `MultipleBucketizerInterface` for this purpose. `Bucketizer` now extends this new interface.
### Performance
Benchmarking using the test dataset provided in JIRA SPARK-20392 (blockbuster.csv).
The ML pipeline includes 2 `StringIndexer`s and 1 `MultipleBucketizer` or 137 `Bucketizer`s to bin 137 input columns with the same splits. Then count the time to transform the dataset.
MultipleBucketizer: 3352 ms
Bucketizer: 51512 ms
## How was this patch tested?
Jenkins tests.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#17819 from viirya/SPARK-20542.
## What changes were proposed in this pull request?
For a class with field name of special characters, e.g.:
```scala
case class MyType(`field.1`: String, `field 2`: String)
```
Although we can manipulate DataFrame/Dataset, the field names are encoded:
```scala
scala> val df = Seq(MyType("a", "b"), MyType("c", "d")).toDF
df: org.apache.spark.sql.DataFrame = [field$u002E1: string, field$u00202: string]
scala> df.as[MyType].collect
res7: Array[MyType] = Array(MyType(a,b), MyType(c,d))
```
It causes resolving problem when we try to convert the data with non-encoded field names:
```scala
spark.read.json(path).as[MyType]
...
[info] org.apache.spark.sql.AnalysisException: cannot resolve '`field$u002E1`' given input columns: [field 2, fie
ld.1];
[info] at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
...
```
We should use decoded field name in Dataset schema.
## How was this patch tested?
Added tests.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#19664 from viirya/SPARK-22442.
## What changes were proposed in this pull request?
This PR proposes to add `errorifexists` to SparkR API and fix the rest of them describing the mode, mainly, in API documentations as well.
This PR also replaces `convertToJSaveMode` to `setWriteMode` so that string as is is passed to JVM and executes:
b034f2565f/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala (L72-L82)
and remove the duplication here:
3f958a9992/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala (L187-L194)
## How was this patch tested?
Manually checked the built documentation. These were mainly found by `` grep -r `error` `` and `grep -r 'error'`.
Also, unit tests added in `test_sparkSQL.R`.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#19673 from HyukjinKwon/SPARK-21640-followup.
## What changes were proposed in this pull request?
This PR adds support for a new function called `dayofweek` that returns the day of the week of the given argument as an integer value in the range 1-7, where 1 represents Sunday.
## How was this patch tested?
Unit tests and manual tests.
Author: ptkool <michael.styles@shopify.com>
Closes#19672 from ptkool/day_of_week_function.
## What changes were proposed in this pull request?
`spark.sql.statistics.autoUpdate.size` should be `spark.sql.statistics.size.autoUpdate.enabled`. The previous name is confusing as users may treat it as a size config.
This config is in master branch only, no backward compatibility issue.
## How was this patch tested?
N/A
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19667 from cloud-fan/minor.
## What changes were proposed in this pull request?
clarify exception behaviors for all data source v2 interfaces.
## How was this patch tested?
document change only
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19623 from cloud-fan/data-source-exception.
## What changes were proposed in this pull request?
`CodegenContext.copyResult` is kind of a global status for whole stage codegen. But the tricky part is, it is only used to transfer an information from child to parent when calling the `consume` chain. We have to be super careful in `produce`/`consume`, to set it to true when producing multiple result rows, and set it to false in operators that start new pipeline(like sort).
This PR moves the `copyResult` to `CodegenSupport`, and call it at `WholeStageCodegenExec`. This is much easier to reason about.
## How was this patch tested?
existing tests
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19656 from cloud-fan/whole-sage.
…
## What changes were proposed in this pull request?
override JDBCDialects methods quoteIdentifier, getTableExistsQuery and getSchemaQuery in AggregatedDialect
## How was this patch tested?
Test the new implementation in JDBCSuite test("Aggregated dialects")
Author: Huaxin Gao <huaxing@us.ibm.com>
Closes#19658 from huaxingao/spark-22443.
## What changes were proposed in this pull request?
Next fit decreasing bin packing algorithm is used to combine splits in DataSourceScanExec but the comment incorrectly states that first fit decreasing algorithm is used. The current implementation doesn't go back to a previously used bin other than the bin that the last element was put into.
Author: Vinitha Gankidi <vgankidi@netflix.com>
Closes#19634 from vgankidi/SPARK-22412.
## What changes were proposed in this pull request?
When we insert `BatchEvalPython` for Python UDFs into a query plan, if its child has some outputs that are not used by the original parent node, `BatchEvalPython` will still take those outputs and save into the queue. When the data for those outputs are big, it is easily to generate big spill on disk.
For example, the following reproducible code is from the JIRA ticket.
```python
from pyspark.sql.functions import *
from pyspark.sql.types import *
lines_of_file = [ "this is a line" for x in xrange(10000) ]
file_obj = [ "this_is_a_foldername/this_is_a_filename", lines_of_file ]
data = [ file_obj for x in xrange(5) ]
small_df = spark.sparkContext.parallelize(data).map(lambda x : (x[0], x[1])).toDF(["file", "lines"])
exploded = small_df.select("file", explode("lines"))
def split_key(s):
return s.split("/")[1]
split_key_udf = udf(split_key, StringType())
with_filename = exploded.withColumn("filename", split_key_udf("file"))
with_filename.explain(True)
```
The physical plan before/after this change:
Before:
```
*Project [file#0, col#5, pythonUDF0#14 AS filename#9]
+- BatchEvalPython [split_key(file#0)], [file#0, lines#1, col#5, pythonUDF0#14]
+- Generate explode(lines#1), true, false, [col#5]
+- Scan ExistingRDD[file#0,lines#1]
```
After:
```
*Project [file#0, col#5, pythonUDF0#14 AS filename#9]
+- BatchEvalPython [split_key(file#0)], [col#5, file#0, pythonUDF0#14]
+- *Project [col#5, file#0]
+- Generate explode(lines#1), true, false, [col#5]
+- Scan ExistingRDD[file#0,lines#1]
```
Before this change, `lines#1` is a redundant input to `BatchEvalPython`. This patch removes it by adding a Project.
## How was this patch tested?
Manually test.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#19642 from viirya/SPARK-22410.
## What changes were proposed in this pull request?
Added a test class to check NULL handling behavior.
The expected behavior is defined as the one of the most well-known databases as specified here: https://sqlite.org/nulls.html.
SparkSQL behaves like other DBs:
- Adding anything to null gives null -> YES
- Multiplying null by zero gives null -> YES
- nulls are distinct in SELECT DISTINCT -> NO
- nulls are distinct in a UNION -> NO
- "CASE WHEN null THEN 1 ELSE 0 END" is 0? -> YES
- "null OR true" is true -> YES
- "not (null AND false)" is true -> YES
- null in aggregation are skipped -> YES
## How was this patch tested?
Added test class
Author: Marco Gaido <mgaido@hortonworks.com>
Closes#19653 from mgaido91/SPARK-22418.
forward-port https://github.com/apache/spark/pull/19622 to master branch.
This bug doesn't exist in master because we've added hive bucketing support and the hive bucketing metadata can be recognized by Spark, but we should still port it to master: 1) there may be other unsupported hive metadata removed by Spark. 2) reduce code difference between master and 2.2 to ease the backport in the feature.
***
When we alter table schema, we set the new schema to spark `CatalogTable`, convert it to hive table, and finally call `hive.alterTable`. This causes a problem in Spark 2.2, because hive bucketing metedata is not recognized by Spark, which means a Spark `CatalogTable` representing a hive table is always non-bucketed, and when we convert it to hive table and call `hive.alterTable`, the original hive bucketing metadata will be removed.
To fix this bug, we should read out the raw hive table metadata, update its schema, and call `hive.alterTable`. By doing this we can guarantee only the schema is changed, and nothing else.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#19644 from cloud-fan/infer.
## What changes were proposed in this pull request?
According to the [discussion](https://github.com/apache/spark/pull/19571#issuecomment-339472976) on SPARK-15474, we will add new OrcFileFormat in `sql/core` module and allow users to use both old and new OrcFileFormat.
To do that, `OrcOptions` should be visible in `sql/core` module, too. Previously, it was `private[orc]` in `sql/hive`. This PR removes `private[orc]` because we don't use `private[sql]` in `sql/execution` package after [SPARK-16964](https://github.com/apache/spark/pull/14554).
## How was this patch tested?
Pass the Jenkins with the existing tests.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#19636 from dongjoon-hyun/SPARK-22416.
## What changes were proposed in this pull request?
Adding a global limit on top of the distinct values before sorting and collecting will reduce the overall work in the case where we have more distinct values. We will also eagerly perform a collect rather than a take because we know we only have at most (maxValues + 1) rows.
## How was this patch tested?
Existing tests cover sorted order
Author: Patrick Woody <pwoody@palantir.com>
Closes#19629 from pwoody/SPARK-22408.
## What changes were proposed in this pull request?
This patch includes some doc updates for data source API v2. I was reading the code and noticed some minor issues.
## How was this patch tested?
This is a doc only change.
Author: Reynold Xin <rxin@databricks.com>
Closes#19626 from rxin/dsv2-update.
## What changes were proposed in this pull request?
Write HDFSBackedStateStoreProvider.loadMap non-recursively. This prevents stack overflow if too many deltas stack up in a low memory environment.
## How was this patch tested?
existing unit tests for functional equivalence, new unit test to check for stack overflow
Author: Jose Torres <jose@databricks.com>
Closes#19611 from joseph-torres/SPARK-22305.
## What changes were proposed in this pull request?
Both `ReadSupport` and `ReadTask` have a method called `createReader`, but they create different things. This could cause some confusion for data source developers. The same issue exists between `WriteSupport` and `DataWriterFactory`, both of which have a method called `createWriter`. This PR renames the method of `ReadTask`/`DataWriterFactory` to `createDataReader`/`createDataWriter`.
Besides, the name of `RowToInternalRowDataWriterFactory` is not correct, because it actually converts `InternalRow`s to `Row`s. It should be renamed `InternalRowDataWriterFactory`.
## How was this patch tested?
Only renaming, should be covered by existing tests.
Author: Zhenhua Wang <wzh_zju@163.com>
Closes#19610 from wzhfy/rename.
## What changes were proposed in this pull request?
When Hive support is not on, users can hit unresolved plan node when trying to call `INSERT OVERWRITE DIRECTORY` using Hive format.
```
"unresolved operator 'InsertIntoDir true, Storage(Location: /private/var/folders/vx/j0ydl5rn0gd9mgrh1pljnw900000gn/T/spark-b4227606-9311-46a8-8c02-56355bf0e2bc, Serde Library: org.apache.hadoop.hive.ql.io.orc.OrcSerde, InputFormat: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat, OutputFormat: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat), hive, true;;
```
This PR is to issue a better error message.
## How was this patch tested?
Added a test case.
Author: gatorsmile <gatorsmile@gmail.com>
Closes#19608 from gatorsmile/hivesupportInsertOverwrite.
## What changes were proposed in this pull request?
In `UnsafeInMemorySorter`, one record may take 32 bytes: 1 `long` for pointer, 1 `long` for key-prefix, and another 2 `long`s as the temporary buffer for radix sort.
In `UnsafeExternalSorter`, we set the `DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD` to be `1024 * 1024 * 1024 / 2`, and hoping the max size of point array to be 8 GB. However this is wrong, `1024 * 1024 * 1024 / 2 * 32` is actually 16 GB, and if we grow the point array before reach this limitation, we may hit the max-page-size error.
Users may see exception like this on large dataset:
```
Caused by: java.lang.IllegalArgumentException: Cannot allocate a page with more than 17179869176 bytes
at org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:241)
at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:121)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:374)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:396)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:94)
...
```
Setting `DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD` to a smaller number is not enough, users can still set the config to a big number and trigger the too large page size issue. This PR fixes it by explicitly handling the too large page size exception in the sorter and spill.
This PR also change the type of `spark.shuffle.spill.numElementsForceSpillThreshold` to int, because it's only compared with `numRecords`, which is an int. This is an internal conf so we don't have a serious compatibility issue.
## How was this patch tested?
TODO
Author: Wenchen Fan <wenchen@databricks.com>
Closes#18251 from cloud-fan/sort.
## What changes were proposed in this pull request?
This PR fixes the conversion error when reads data from a PostgreSQL table that contains columns of `uuid[]`, `inet[]` and `cidr[]` data types.
For example, create a table with the uuid[] data type, and insert the test data.
```SQL
CREATE TABLE users
(
id smallint NOT NULL,
name character varying(50),
user_ids uuid[],
PRIMARY KEY (id)
)
INSERT INTO users ("id", "name","user_ids")
VALUES (1, 'foo', ARRAY
['7be8aaf8-650e-4dbb-8186-0a749840ecf2'
,'205f9bfc-018c-4452-a605-609c0cfad228']::UUID[]
)
```
Then it will throw the following exceptions when trying to load the data.
```
java.lang.ClassCastException: [Ljava.util.UUID; cannot be cast to [Ljava.lang.String;
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$14.apply(JdbcUtils.scala:459)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$14.apply(JdbcUtils.scala:458)
...
```
## How was this patch tested?
Added test in `PostgresIntegrationSuite`.
Author: Jen-Ming Chung <jenmingisme@gmail.com>
Closes#19567 from jmchung/SPARK-22291.