Commit graph

9617 commits

Author SHA1 Message Date
xy_xin 20cd47e82d [SPARK-32030][SQL] Support unlimited MATCHED and NOT MATCHED clauses in MERGE INTO
### What changes were proposed in this pull request?
This PR add unlimited MATCHED and NOT MATCHED clauses in MERGE INTO statement.

### Why are the changes needed?
Now the MERGE INTO syntax is,
```
MERGE INTO [db_name.]target_table [AS target_alias]
 USING [db_name.]source_table [<time_travel_version>] [AS source_alias]
 ON <merge_condition>
 [ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
 [ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
 [ WHEN NOT MATCHED [ AND <condition> ] THEN <not_matched_action> ]
```
It would be nice if we support unlimited MATCHED and NOT MATCHED clauses in MERGE INTO statement, because users may want to deal with different "AND <condition>"s, the result of which just like a series of "CASE WHEN"s. The expected syntax looks like
```
MERGE INTO [db_name.]target_table [AS target_alias]
 USING [db_name.]source_table [<time_travel_version>] [AS source_alias]
 ON <merge_condition>
 [when_matched_clause [, ...]]
 [when_not_matched_clause [, ...]]
```
where when_matched_clause is
```
WHEN MATCHED [ AND <condition> ] THEN <matched_action>
```
and when_not_matched_clause is
```
WHEN NOT MATCHED [ AND <condition> ] THEN <not_matched_action>
 ```
matched_action can be one of
```
DELETE
UPDATE SET * or
UPDATE SET col1 = value1 [, col2 = value2, ...]
```
and not_matched_action can be one of
```
INSERT *
INSERT (col1 [, col2, ...]) VALUES (value1 [, value2, ...])
```
### Does this PR introduce _any_ user-facing change?
Yes. The SQL command changes, but it is backward compatible.

### How was this patch tested?
New tests added.

Closes #28875 from xianyinxin/SPARK-32030.

Authored-by: xy_xin <xianyin.xxy@alibaba-inc.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-06-29 13:13:42 +00:00
yi.wu 6fcb70e0ca [SPARK-32055][CORE][SQL] Unify getReader and getReaderForRange in ShuffleManager
### What changes were proposed in this pull request?

This PR tries to unify the method `getReader` and `getReaderForRange` in `ShuffleManager`.

### Why are the changes needed?

Reduce the duplicate codes, simplify the implementation, and for better maintenance.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Covered by existing tests.

Closes #28895 from Ngone51/unify-getreader.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-06-29 11:37:03 +00:00
Liang-Chi Hsieh 4204a63d4f [SPARK-32056][SQL] Coalesce partitions for repartition by expressions when AQE is enabled
### What changes were proposed in this pull request?

This patch proposes to coalesce partitions for repartition by expressions without specifying number of partitions, when AQE is enabled.

### Why are the changes needed?

When repartition by some partition expressions, users can specify number of partitions or not. If  the number of partitions is specified, we should not coalesce partitions because it breaks user expectation. But if without specifying number of partitions, AQE should be able to coalesce partitions as other shuffling.

### Does this PR introduce _any_ user-facing change?

Yes. After this change, if users don't specify the number of partitions when repartitioning data by expressions, AQE will coalesce partitions.

### How was this patch tested?

Added unit test.

Closes #28900 from viirya/SPARK-32056.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-06-29 11:33:40 +00:00
Wenchen Fan 835ef425d0 [SPARK-32038][SQL][FOLLOWUP] Make the alias name pretty after float/double normalization
### What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/28876/files

This PR proposes to use the name of the original expression, as the alias name of the normalization expression.

### Why are the changes needed?

make the query plan looks pretty when EXPLAIN.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

manually explain the query

Closes #28919 from cloud-fan/follow.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-06-28 21:55:19 -07:00
yi.wu 0ec17c989d [SPARK-32090][SQL] Improve UserDefinedType.equal() to make it be symmetrical
### What changes were proposed in this pull request?

This PR fix `UserDefinedType.equal()` by comparing the UDT class instead of checking `acceptsType()`.

### Why are the changes needed?

It's weird that equality comparison between two UDT types can have different result by switching the order:

```scala
// ExampleSubTypeUDT.userClass is a subclass of ExampleBaseTypeUDT.userClass
val udt1 = new ExampleBaseTypeUDT
val udt2 = new ExampleSubTypeUDT
println(udt1 == udt2) // true
println(udt2 == udt1) // false
```

### Does this PR introduce _any_ user-facing change?

Yes.

Before:
```scala
// ExampleSubTypeUDT.userClass is a subclass of ExampleBaseTypeUDT.userClass
val udt1 = new ExampleBaseTypeUDT
val udt2 = new ExampleSubTypeUDT
println(udt1 == udt2) // true
println(udt2 == udt1) // false
```

After:
```scala
// ExampleSubTypeUDT.userClass is a subclass of ExampleBaseTypeUDT.userClass
val udt1 = new ExampleBaseTypeUDT
val udt2 = new ExampleSubTypeUDT
println(udt1 == udt2) // false
println(udt2 == udt1) // false
```

### How was this patch tested?

Added a unit test.

Closes #28923 from Ngone51/fix-udt-equal.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-06-28 21:49:10 -07:00
Yuanjian Li f944603872 [SPARK-32126][SS] Scope Session.active in IncrementalExecution
### What changes were proposed in this pull request?

The `optimizedPlan` in IncrementalExecution should also be scoped in `withActive`.

### Why are the changes needed?

Follow-up of SPARK-30798 for the Streaming side.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing UT.

Closes #28936 from xuanyuanking/SPARK-30798-follow.

Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-06-28 21:35:59 -07:00
Yuanjian Li 6484c14c57 [SPARK-32115][SQL] Fix SUBSTRING to handle integer overflows
### What changes were proposed in this pull request?
Bug fix for overflow case in `UTF8String.substringSQL`.

### Why are the changes needed?
SQL query `SELECT SUBSTRING("abc", -1207959552, -1207959552)` incorrectly returns` "abc"` against expected output of `""`. For query `SUBSTRING("abc", -100, -100)`, we'll get the right output of `""`.

### Does this PR introduce _any_ user-facing change?
Yes, bug fix for the overflow case.

### How was this patch tested?
New UT.

Closes #28937 from xuanyuanking/SPARK-32115.

Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-06-28 12:22:44 -07:00
Max Gekk 8c44d74463 [SPARK-32071][SQL][TESTS] Add make_interval benchmark
### What changes were proposed in this pull request?
Add benchmarks for interval constructor `make_interval` and measure perf of 4 cases:
1. Constant (year, month)
2. Constant (week, day)
3. Constant (hour, minute, second, second fraction)
4. All fields are NOT constant.

The benchmark results are generated in the environment:

| Item | Description |
| ---- | ----|
| Region | us-west-2 (Oregon) |
| Instance | r3.xlarge |
| AMI | ubuntu/images/hvm-ssd/ubuntu-bionic-18.04-amd64-server-20190722.1 (ami-06f2f779464715dc5) |
| Java | OpenJDK 64-Bit Server VM 1.8.0_252 and OpenJDK 64-Bit Server VM 11.0.7+10 |

### Why are the changes needed?
To have a base line for future perf improvements of `make_interval`, and to prevent perf regressions in the future.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By running `IntervalBenchmark` via:
```
$ SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain org.apache.spark.sql.execution.benchmark.IntervalBenchmark"
```

Closes #28905 from MaxGekk/benchmark-make_interval.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-06-27 17:54:06 -07:00
GuoPhilipse ac3a0551d8 [SPARK-32088][PYTHON] Pin the timezone in timestamp_seconds doctest
### What changes were proposed in this pull request?

Add American timezone during timestamp_seconds doctest

### Why are the changes needed?

`timestamp_seconds` doctest in `functions.py` used default timezone to get expected result
For example:

```python
>>> time_df = spark.createDataFrame([(1230219000,)], ['unix_time'])
>>> time_df.select(timestamp_seconds(time_df.unix_time).alias('ts')).collect()
[Row(ts=datetime.datetime(2008, 12, 25, 7, 30))]
```

But when we have a non-american timezone, the test case will get different test result.

For example, when we set current timezone as `Asia/Shanghai`, the test result will be

```
[Row(ts=datetime.datetime(2008, 12, 25, 23, 30))]
```

So no matter where we run the test case ,we will always get the expected permanent result if we set the timezone on one specific area.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit test

Closes #28932 from GuoPhilipse/SPARK-32088-fix-timezone-issue.

Lead-authored-by: GuoPhilipse <46367746+GuoPhilipse@users.noreply.github.com>
Co-authored-by: GuoPhilipse <guofei_ok@126.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-06-26 19:06:31 -07:00
Pablo Langa bbb2cba615 [SPARK-32025][SQL] Csv schema inference problems with different types in the same column
### What changes were proposed in this pull request?

This pull request fixes a bug present in the csv type inference.
We have problems when we have different types in the same column.

**Previously:**
```
$ cat /example/f1.csv
col1
43200000
true

spark.read.csv(path="file:///example/*.csv", header=True, inferSchema=True).show()
+----+
|col1|
+----+
|null|
|true|
+----+

root
 |-- col1: boolean (nullable = true)
```
**Now**
```
spark.read.csv(path="file:///example/*.csv", header=True, inferSchema=True).show()
+-------------+
|col1          |
+-------------+
|43200000 |
|true           |
+-------------+

root
 |-- col1: string (nullable = true)
```

Previously the hierarchy of type inference is the following:

> IntegerType
> > LongType
> > > DecimalType
> > > > DoubleType
> > > > > TimestampType
> > > > > > BooleanType
> > > > > > > StringType

So, when, for example, we have integers in one column, and the last element is a boolean, all the column is inferred as a boolean column incorrectly and all the number are shown as null when you see the data

We need the following hierarchy. When we have different numeric types in the column it will be resolved correctly. And when we have other different types it will be resolved as a String type column
> IntegerType
> > LongType
> > > DecimalType
> > > > DoubleType
> > > > > StringType

> TimestampType
> > StringType

> BooleanType
> > StringType

> StringType

### Why are the changes needed?

Fix the bug explained

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit test and manual tests

Closes #28896 from planga82/feature/SPARK-32025_csv_inference.

Authored-by: Pablo Langa <soypab@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-06-26 10:41:27 +09:00
yi.wu 47fb9d6054 [SPARK-32087][SQL] Allow UserDefinedType to use encoder to deserialize rows in ScalaUDF as well
### What changes were proposed in this pull request?

This PR tries to address the comment: https://github.com/apache/spark/pull/28645#discussion_r442183888
It changes `canUpCast/canCast` to allow cast from sub UDT to base UDT, in order to achieve the goal to allow UserDefinedType to use `ExpressionEncoder` to deserialize rows in ScalaUDF as well.

One thing that needs to mention is, even we allow cast from sub UDT to base UDT, it doesn't really do the cast in `Cast`. Because, yet, sub UDT and base UDT are considered as the same type(because of #16660), see:

5264164a67/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala (L81-L86)

5264164a67/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala (L92-L95)

Therefore, the optimize rule `SimplifyCast` will eliminate the cast at the end.

### Why are the changes needed?

Reduce the special case caused by `UserDefinedType` in `ResolveEncodersInUDF` and `ScalaUDF`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

It should be covered by the test of `SPARK-19311`, which is also updated a little in this PR.

Closes #28920 from Ngone51/fix-udf-udt.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-06-24 14:50:45 +00:00
Bryan Cutler df04107934 [SPARK-32080][SPARK-31998][SQL] Simplify ArrowColumnVector ListArray accessor
### What changes were proposed in this pull request?

This change simplifies the ArrowColumnVector ListArray accessor to use provided Arrow APIs available in v0.15.0 to calculate element indices.

### Why are the changes needed?

This simplifies the code by avoiding manual calculations on the Arrow offset buffer and makes use of more stable APIs.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests

Closes #28915 from BryanCutler/arrow-simplify-ArrowColumnVector-ListArray-SPARK-32080.

Authored-by: Bryan Cutler <cutlerb@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-06-24 22:13:54 +09:00
ulysses 9f540fac2e [SPARK-32062][SQL] Reset listenerRegistered in SparkSession
### What changes were proposed in this pull request?

Reset listenerRegistered when application end.

### Why are the changes needed?

Within a jvm, stop and create `SparkContext` multi times will cause the bug.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Add UT.

Closes #28899 from ulysses-you/SPARK-32062.

Authored-by: ulysses <youxiduo@weidian.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-06-24 04:50:46 +00:00
Max Gekk 045106e29d [SPARK-32072][CORE][TESTS] Fix table formatting with benchmark results
### What changes were proposed in this pull request?
Set column width w/ benchmark names to maximum of either
1. 40 (before this PR) or
2. The length of benchmark name or
3. Maximum length of cases names

### Why are the changes needed?
To improve readability of benchmark results. For example, `MakeDateTimeBenchmark`.

Before:
```
make_timestamp():                         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
prepare make_timestamp()                           3636           3673          38          0.3        3635.7       1.0X
make_timestamp(2019, 1, 2, 3, 4, 50.123456)             94             99           4         10.7          93.8      38.8X
make_timestamp(2019, 1, 2, 3, 4, 60.000000)             68             80          13         14.6          68.3      53.2X
make_timestamp(2019, 12, 31, 23, 59, 60.00)             65             79          19         15.3          65.3      55.7X
make_timestamp(*, *, *, 3, 4, 50.123456)            271            280          14          3.7         270.7      13.4X
```

After:
```
make_timestamp():                            Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
---------------------------------------------------------------------------------------------------------------------------
prepare make_timestamp()                              3694           3745          82          0.3        3694.0       1.0X
make_timestamp(2019, 1, 2, 3, 4, 50.123456)             82             90           9         12.2          82.3      44.9X
make_timestamp(2019, 1, 2, 3, 4, 60.000000)             72             77           5         13.9          71.9      51.4X
make_timestamp(2019, 12, 31, 23, 59, 60.00)             67             71           5         15.0          66.8      55.3X
make_timestamp(*, *, *, 3, 4, 50.123456)               273            289          14          3.7         273.2      13.5X
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By re-generating benchmark results for `MakeDateTimeBenchmark`:
```
$ SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain org.apache.spark.sql.execution.benchmark.MakeDateTimeBenchmark"
```
in the environment:

| Item | Description |
| ---- | ----|
| Region | us-west-2 (Oregon) |
| Instance | r3.xlarge |
| AMI | ubuntu/images/hvm-ssd/ubuntu-bionic-18.04-amd64-server-20190722.1 (ami-06f2f779464715dc5) |
| Java | OpenJDK 64-Bit Server VM 1.8.0_252 and OpenJDK 64-Bit Server VM 11.0.7+10 |

Closes #28906 from MaxGekk/benchmark-table-formatting.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-06-24 04:43:53 +00:00
Max Gekk e00f43cb86 [SPARK-32043][SQL] Replace Decimal by Int op in make_interval and make_timestamp
### What changes were proposed in this pull request?
Replace Decimal by Int op in the `MakeInterval` & `MakeTimestamp` expression. For instance, `(secs * Decimal(MICROS_PER_SECOND)).toLong` can be replaced by the unscaled long because the former one already contains microseconds.

### Why are the changes needed?
To improve performance.

Before:
```
make_timestamp():                         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
...
make_timestamp(2019, 1, 2, 3, 4, 50.123456)             94             99           4         10.7          93.8      38.8X
```

After:
```
make_timestamp(2019, 1, 2, 3, 4, 50.123456)             76             92          15         13.1          76.5      48.1X
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
- By existing test suites `IntervalExpressionsSuite`, `DateExpressionsSuite` and etc.
- Re-generate results of `MakeDateTimeBenchmark` in the environment:

| Item | Description |
| ---- | ----|
| Region | us-west-2 (Oregon) |
| Instance | r3.xlarge |
| AMI | ubuntu/images/hvm-ssd/ubuntu-bionic-18.04-amd64-server-20190722.1 (ami-06f2f779464715dc5) |
| Java | OpenJDK 64-Bit Server VM 1.8.0_252 and OpenJDK 64-Bit Server VM 11.0.7+10 |

Closes #28886 from MaxGekk/make_interval-opt-decimal.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-06-23 11:45:12 +00:00
Max Gekk fcf9768098 [SPARK-32052][SQL] Extract common code from date-time field expressions
### What changes were proposed in this pull request?
Extract common code from the expressions that get date or time fields from input dates/timestamps to new expressions `GetDateField` and `GetTimeField`, and re-use the common traits from the affected classes.

### Why are the changes needed?
Code deduplication improves maintainability.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By `DateExpressionsSuite`

Closes #28894 from MaxGekk/get-date-time-field-expr.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-06-23 06:13:55 +00:00
Max Gekk 979a8eb04a [MINOR][SQL] Simplify DateTimeUtils.cleanLegacyTimestampStr
### What changes were proposed in this pull request?
Call the `replace()` method from `UTF8String` to remove the `GMT` string from the input of `DateTimeUtils.cleanLegacyTimestampStr`. It removes all `GMT` substrings.

### Why are the changes needed?
Simpler code improves maintainability

### Does this PR introduce _any_ user-facing change?
Should not

### How was this patch tested?
By existing test suites `JsonSuite` and `UnivocityParserSuite`.

Closes #28892 from MaxGekk/simplify-cleanLegacyTimestampStr.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-06-23 05:53:04 +00:00
yi.wu 338efee509 [SPARK-32031][SQL] Fix the wrong references of the PartialMerge/Final AggregateExpression
### What changes were proposed in this pull request?

This PR changes the references of the `PartialMerge`/`Final` `AggregateExpression` from `aggBufferAttributes` to `inputAggBufferAttributes`.

After this change, the tests of `SPARK-31620` can fail on the assertion of `QueryTest.assertEmptyMissingInput`.  So, this PR also fixes it by overriding the `inputAggBufferAttributes` of the Aggregate operators.

### Why are the changes needed?

With my understanding of Aggregate framework, especially, according to the logic of `AggUtils.planAggXXX`, I think for the `PartialMerge`/`Final` `AggregateExpression` the right references should be `inputAggBufferAttributes`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Before this patch, for an Aggregate operator, its input attributes will always be equal to or more than(because it refers to its own attributes while it should refer to the attributes from the child) its reference attributes. Therefore, its missing inputs must always be empty and break nothing. Thus, it's impossible to add a UT for this patch.

However, after correcting the right references in this PR, the problem is then exposed by `QueryTest.assertEmptyMissingInput` in the UT of SPARK-31620, since missing inputs are no longer always empty. This PR can fix the problem.

Closes #28869 from Ngone51/fix-agg-reference.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-06-22 13:59:46 +00:00
Dilip Biswal 6293c38cff [MINOR][SQL] Add IS [NOT] NULL examples to ArrayFilter/ArrayExists
### What changes were proposed in this pull request?
A minor PR that adds a couple of usage examples for ArrayFilter and ArrayExists that shows how to deal with NULL data.

### Why are the changes needed?
Enhances the examples that shows how to filter out null values from an array and also to test if null value exists in an array.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Tested manually.

Closes #28890 from dilipbiswal/array_func_description.

Authored-by: Dilip Biswal <dkbiswal@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-06-22 21:38:19 +09:00
Liang-Chi Hsieh 2e4557f45c [SPARK-32038][SQL] NormalizeFloatingNumbers should also work on distinct aggregate
### What changes were proposed in this pull request?

This patch applies `NormalizeFloatingNumbers` to distinct aggregate to fix a regression of distinct aggregate on NaNs.

### Why are the changes needed?

We added `NormalizeFloatingNumbers` optimization rule in 3.0.0 to normalize special floating numbers (NaN and -0.0). But it is missing in distinct aggregate so causes a regression. We need to apply this rule on distinct aggregate to fix it.

### Does this PR introduce _any_ user-facing change?

Yes, fixing a regression of distinct aggregate on NaNs.

### How was this patch tested?

Added unit test.

Closes #28876 from viirya/SPARK-32038.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-06-22 04:58:22 -07:00
Yuanjian Li 6fdea63b15 [SPARK-31905][SS] Add compatibility tests for streaming state store format
### What changes were proposed in this pull request?
Add compatibility tests for streaming state store format.

### Why are the changes needed?
After SPARK-31894, we have a validation checking for the streaming state store. It's better to add integrated tests in the PR builder as soon as the breaking changes introduced.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Test only.

Closes #28725 from xuanyuanking/compatibility_check.

Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-06-22 07:56:59 +00:00
Kent Yao 9f8e15bb2e [SPARK-32034][SQL] Port HIVE-14817: Shutdown the SessionManager timeoutChecker thread properly upon shutdown
### What changes were proposed in this pull request?

This PR port https://issues.apache.org/jira/browse/HIVE-14817 for spark thrift server.

### Why are the changes needed?

When stopping the HiveServer2, the non-daemon thread stops the server from terminating

```sql
"HiveServer2-Background-Pool: Thread-79" #79 prio=5 os_prio=31 tid=0x00007fde26138800 nid=0x13713 waiting on condition [0x0000700010c32000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
	at java.lang.Thread.sleep(Native Method)
	at org.apache.hive.service.cli.session.SessionManager$1.sleepInterval(SessionManager.java:178)
	at org.apache.hive.service.cli.session.SessionManager$1.run(SessionManager.java:156)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
```

Here is an example to reproduce:
https://github.com/yaooqinn/kyuubi/blob/master/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/spark/SparkSQLEngineApp.scala

Also, it causes issues as HIVE-14817 described which

### Does this PR introduce _any_ user-facing change?
NO

### How was this patch tested?

Passing Jenkins

Closes #28870 from yaooqinn/SPARK-32034.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-06-21 16:28:00 -07:00
ulysses 978493467c [SPARK-32019][SQL] Add spark.sql.files.minPartitionNum config
### What changes were proposed in this pull request?

Add a new config `spark.sql.files.minPartitionNum` to control file split partition in local session.

### Why are the changes needed?

Aims to control file split partitions in session level.
More details see discuss in [PR-28778](https://github.com/apache/spark/pull/28778).

### Does this PR introduce _any_ user-facing change?

Yes, new config.

### How was this patch tested?

Add UT.

Closes #28853 from ulysses-you/SPARK-32019.

Authored-by: ulysses <youxiduo@weidian.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-06-20 18:38:44 -07:00
Kent Yao 93529a8536 [SPARK-31957][SQL] Cleanup hive scratch dir for the developer api startWithContext
### What changes were proposed in this pull request?

Comparing to the long-running ThriftServer via start-script, we are more likely to hit the issue https://issues.apache.org/jira/browse/HIVE-10415 / https://issues.apache.org/jira/browse/SPARK-31626 in the developer API `startWithContext`

This PR apply SPARK-31626 to the developer API `startWithContext`

### Why are the changes needed?

Fix the issue described in  SPARK-31626

### Does this PR introduce _any_ user-facing change?

Yes, the hive scratch dir will be deleted if cleanup is enabled for calling `startWithContext`

### How was this patch tested?

new test

Closes #28784 from yaooqinn/SPARK-31957.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-06-19 19:54:46 -07:00
Max Gekk 66ba35666a [SPARK-32021][SQL] Increase precision of seconds and fractions of make_interval
### What changes were proposed in this pull request?
Change precision of seconds and its fraction from 8 to 18 to be able to construct intervals of max allowed microseconds value (long).

### Why are the changes needed?
To improve UX of Spark SQL.

### Does this PR introduce _any_ user-facing change?
Yes

### How was this patch tested?
- Add tests to IntervalExpressionsSuite
- Add an example to the `MakeInterval` expression
- Add tests to `interval.sql`

Closes #28873 from MaxGekk/make_interval-sec-precision.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-06-19 19:33:13 -07:00
TJX2014 177a380bcf [SPARK-31980][SQL] Function sequence() fails if start and end of range are equal dates
### What changes were proposed in this pull request?
1. Add judge equal as bigger condition in `org.apache.spark.sql.catalyst.expressions.Sequence.TemporalSequenceImpl#eval`
2. Unit test for interval `day`, `month`, `year`

### Why are the changes needed?
Bug exists when sequence input get same equal start and end dates, which will occur `while loop` forever

### Does this PR introduce _any_ user-facing change?
Yes,
Before this PR, people will get a `java.lang.ArrayIndexOutOfBoundsException`, when eval as below:
`sql("select sequence(cast('2011-03-01' as date), cast('2011-03-01' as date), interval 1 year)").show(false)
`

### How was this patch tested?
Unit test.

Closes #28819 from TJX2014/master-SPARK-31980.

Authored-by: TJX2014 <xiaoxingstack@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-06-19 19:24:34 -07:00
Terry Kim 7b8683820b [SPARK-31350][SQL] Coalesce bucketed tables for sort merge join if applicable
### What changes were proposed in this pull request?

When two bucketed tables with different number of buckets are joined, it can introduce a full shuffle:
```
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "0")
val df1 = (0 until 20).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k")
val df2 = (0 until 20).map(i => (i % 7, i % 11, i.toString)).toDF("i", "j", "k")
df1.write.format("parquet").bucketBy(8, "i").saveAsTable("t1")
df2.write.format("parquet").bucketBy(4, "i").saveAsTable("t2")
val t1 = spark.table("t1")
val t2 = spark.table("t2")
val joined = t1.join(t2, t1("i") === t2("i"))
joined.explain

== Physical Plan ==
*(5) SortMergeJoin [i#44], [i#50], Inner
:- *(2) Sort [i#44 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(i#44, 200), true, [id=#105]
:     +- *(1) Project [i#44, j#45, k#46]
:        +- *(1) Filter isnotnull(i#44)
:           +- *(1) ColumnarToRow
:              +- FileScan parquet default.t1[i#44,j#45,k#46] Batched: true, DataFilters: [isnotnull(i#44)], Format: Parquet, Location: InMemoryFileIndex[...], PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int,j:int,k:string>, SelectedBucketsCount: 8 out of 8
+- *(4) Sort [i#50 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(i#50, 200), true, [id=#115]
      +- *(3) Project [i#50, j#51, k#52]
         +- *(3) Filter isnotnull(i#50)
            +- *(3) ColumnarToRow
               +- FileScan parquet default.t2[i#50,j#51,k#52] Batched: true, DataFilters: [isnotnull(i#50)], Format: Parquet, Location: InMemoryFileIndex[...], PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int,j:int,k:string>, SelectedBucketsCount: 4 out of 4
```
This PR proposes to introduce coalescing buckets when the following conditions are met to eliminate the full shuffle:
- Join is the sort merge one (which is created only for equi-join).
- Join keys match with output partition expressions on their respective sides.
- The larger bucket number is divisible by the smaller bucket number.
- `spark.sql.bucketing.coalesceBucketsInSortMergeJoin.enabled` is set to `true`.
- The ratio of the number of buckets should be less than the value set in `spark.sql.bucketing.coalesceBucketsInSortMergeJoin.maxBucketRatio`.

### Why are the changes needed?

Eliminating the full shuffle can benefit for scenarios where two large tables are joined. Especially when the tables are already bucketed but differ in the number of buckets, we could take advantage of it.

### Does this PR introduce any user-facing change?

If the bucket coalescing conditions explained above are met, a full shuffle can be eliminated (also note that you will see `SelectedBucketsCount: 8 out of 8 (Coalesced to 4)` in the physical plan):
```
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "0")
spark.conf.set("spark.sql.bucketing.coalesceBucketsInSortMergeJoin.enabled", "true")
val df1 = (0 until 20).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k")
val df2 = (0 until 20).map(i => (i % 7, i % 11, i.toString)).toDF("i", "j", "k")
df1.write.format("parquet").bucketBy(8, "i").saveAsTable("t1")
df2.write.format("parquet").bucketBy(4, "i").saveAsTable("t2")
val t1 = spark.table("t1")
val t2 = spark.table("t2")
val joined = t1.join(t2, t1("i") === t2("i"))
joined.explain

== Physical Plan ==
*(3) SortMergeJoin [i#44], [i#50], Inner
:- *(1) Sort [i#44 ASC NULLS FIRST], false, 0
:  +- *(1) Project [i#44, j#45, k#46]
:     +- *(1) Filter isnotnull(i#44)
:        +- *(1) ColumnarToRow
:           +- FileScan parquet default.t1[i#44,j#45,k#46] Batched: true, DataFilters: [isnotnull(i#44)], Format: Parquet, Location: InMemoryFileIndex[...], PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int,j:int,k:string>, SelectedBucketsCount: 8 out of 8 (Coalesced to 4)
+- *(2) Sort [i#50 ASC NULLS FIRST], false, 0
   +- *(2) Project [i#50, j#51, k#52]
      +- *(2) Filter isnotnull(i#50)
         +- *(2) ColumnarToRow
            +- FileScan parquet default.t2[i#50,j#51,k#52] Batched: true, DataFilters: [isnotnull(i#50)], Format: Parquet, Location: InMemoryFileIndex[...], PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int,j:int,k:string>, SelectedBucketsCount: 4 out of 4
```

### How was this patch tested?

Added unit tests

Closes #28123 from imback82/coalescing_bucket.

Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2020-06-20 08:20:45 +09:00
yi.wu 5ee5cfd9c0 [SPARK-31826][SQL] Support composed type of case class for typed Scala UDF
### What changes were proposed in this pull request?

This PR adds support for typed Scala UDF to accept composed type of case class, e.g. Seq[T], Array[T], Map[Int, T] (assuming T is case class type), as input parameter type.

### Why are the changes needed?

After #27937, typed Scala UDF now has supported case class as its input parameter type. However, it can not accept the composed type of case class, such as Seq[T], Array[T], Map[Int, T] (assuming T is case class type), which causing confuse(e.g. https://github.com/apache/spark/pull/27937#discussion_r422699979) to the user.

### Does this PR introduce _any_ user-facing change?

Yes.

Run the query:

```
scala> case class Person(name: String, age: Int)
scala> Seq((1, Seq(Person("Jack", 5)))).toDF("id", "persons").withColumn("ages", udf{ s: Seq[Person] => s.head.age }.apply(col("persons"))).show

```

Before:

```

org.apache.spark.SparkException: Failed to execute user defined function($read$$Lambda$2861/628175152: (array<struct<name:string,age:int>>) => int)
  at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1129)
  at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:156)
  at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(InterpretedMutableProjection.scala:83)
  at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$17.$anonfun$applyOrElse$69(Optimizer.scala:1492)
  at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)

....

Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to Person
  at $anonfun$res3$1(<console>:30)
  at $anonfun$res3$1$adapted(<console>:30)
  at org.apache.spark.sql.catalyst.expressions.ScalaUDF.$anonfun$f$2(ScalaUDF.scala:156)
  at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1126)
  ... 142 more
```

After:
```
+---+-----------+----+
| id|    persons|ages|
+---+-----------+----+
|  1|[[Jack, 5]]| [5]|
+---+-----------+----+
```

### How was this patch tested?

Added tests.

Closes #28645 from Ngone51/impr-udf.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-06-19 12:45:47 +00:00
Jungtaek Lim (HeartSaVioR) 6fe3bf66eb [SPARK-31993][SQL] Build arrays for passing variables generated from children for 'concat_ws' with columns having at least one of array type
### What changes were proposed in this pull request?

Please refer the next section `Why are the changes needed?` for details how the current implementation of `concat_ws` is broken for some condition.

This patch fixes the code generation logic for columns having at least one array types of columns in `concat_ws` to build two arrays for storing isNull and value from children's generated code and pass these arrays to the both varargCounts and varargBuilds. This change guarantees that both varargCounts and varargBuilds can access the relevant local variables the children's generated code makes as array parameters, which is critical to ensure both varargCounts and varargBuilds succeed to compile.

Below is the generated code for newly added UT, `SPARK-31993: concat_ws in agg function with plenty of string/array types columns`.

> before the patch

```
/* 001 */ public java.lang.Object generate(Object[] references) {
/* 002 */   return new SpecificUnsafeProjection(references);
/* 003 */ }
/* 004 */
/* 005 */ class SpecificUnsafeProjection extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
/* 006 */
/* 007 */   private Object[] references;
/* 008 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[1];
/* 009 */
/* 010 */   public SpecificUnsafeProjection(Object[] references) {
/* 011 */     this.references = references;
/* 012 */     mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 32);
/* 013 */
/* 014 */   }
/* 015 */
/* 016 */   public void initialize(int partitionIndex) {
/* 017 */
/* 018 */   }
/* 019 */
/* 020 */   // Scala.Function1 need this
/* 021 */   public java.lang.Object apply(java.lang.Object row) {
/* 022 */     return apply((InternalRow) row);
/* 023 */   }
/* 024 */
/* 025 */   public UnsafeRow apply(InternalRow i) {
/* 026 */     mutableStateArray_0[0].reset();
/* 027 */
/* 028 */
/* 029 */     mutableStateArray_0[0].zeroOutNullBytes();
/* 030 */
/* 031 */     apply_0_0(i);
/* 032 */     apply_0_1(i);
/* 033 */     int varargNum_0 = 30;
/* 034 */     int idxInVararg_0 = 0;
/* 035 */
/* 036 */     if (!isNull_2) {
/* 037 */       varargNum_0 += value_2.numElements();
/* 038 */     }
/* 039 */
/* 040 */     if (!isNull_3) {
/* 041 */       varargNum_0 += value_3.numElements();
/* 042 */     }
/* 043 */
/* 044 */     UTF8String[] array_0 = new UTF8String[varargNum_0];
/* 045 */     idxInVararg_0 = varargBuildsConcatWs_0_0(i, array_0, idxInVararg_0);
/* 046 */     idxInVararg_0 = varargBuildsConcatWs_0_1(i, array_0, idxInVararg_0);
/* 047 */     idxInVararg_0 = varargBuildsConcatWs_0_2(i, array_0, idxInVararg_0);
/* 048 */     UTF8String value_0 = UTF8String.concatWs(((UTF8String) references[0] /* literal */), array_0);
/* 049 */     boolean isNull_0 = value_0 == null;
/* 050 */     mutableStateArray_0[0].write(0, value_0);
/* 051 */     return (mutableStateArray_0[0].getRow());
/* 052 */   }
/* 053 */
/* 054 */
/* 055 */   private void apply_0_1(InternalRow i) {
/* 056 */     UTF8String value_25 = i.getUTF8String(22);UTF8String value_26 = i.getUTF8String(23);UTF8String value_27 = i.getUTF8String(24);UTF8String value_28 = i.getUTF8String(25);UTF8String value_29 = i.getUTF8String(26);UTF8String value_30 = i.getUTF8String(27);UTF8String value_31 = i.getUTF8String(28);UTF8String value_32 = i.getUTF8String(29);UTF8String value_33 = i.getUTF8String(30);
/* 057 */   }
/* 058 */
/* 059 */
/* 060 */   private int varargBuildsConcatWs_0_0(InternalRow i, UTF8String [] array_0, int idxInVararg_0) {
/* 061 */
/* 062 */
/* 063 */     if (!isNull_2) {
/* 064 */       final int n_0 = value_2.numElements();
/* 065 */       for (int j = 0; j < n_0; j ++) {
/* 066 */         array_0[idxInVararg_0 ++] = value_2.getUTF8String(j);
/* 067 */       }
/* 068 */     }
/* 069 */
/* 070 */     if (!isNull_3) {
/* 071 */       final int n_1 = value_3.numElements();
/* 072 */       for (int j = 0; j < n_1; j ++) {
/* 073 */         array_0[idxInVararg_0 ++] = value_3.getUTF8String(j);
/* 074 */       }
/* 075 */     }
/* 076 */     array_0[idxInVararg_0 ++] = false ? (UTF8String) null : value_4;array_0[idxInVararg_0 ++] = false ? (UTF8String) null : value_5;array_0[idxInVararg_0 ++] = false ? (UTF8String) null : value_6;array_0[idxInVararg_0 ++] = false ? (UTF8String) null : value_7;array_0[idxInVararg_0 ++] = false ? (UTF8String) null : value_8;array_0[idxInVararg_0 ++] = false ? (UTF8String) null : value_9;array_0[idxInVararg_0 ++] = false ? (UTF8String) null : value_10;array_0[idxInVararg_0 ++] = false ? (UTF8String) null : value_11;
/* 077 */     return idxInVararg_0;
/* 078 */
/* 079 */   }
/* 080 */
/* 081 */
/* 082 */   private int varargBuildsConcatWs_0_2(InternalRow i, UTF8String [] array_0, int idxInVararg_0) {
/* 083 */
/* 084 */     array_0[idxInVararg_0 ++] = false ? (UTF8String) null : value_28;array_0[idxInVararg_0 ++] = false ? (UTF8String) null : value_29;array_0[idxInVararg_0 ++] = false ? (UTF8String) null : value_30;array_0[idxInVararg_0 ++] = false ? (UTF8String) null : value_31;array_0[idxInVararg_0 ++] = false ? (UTF8String) null : value_32;array_0[idxInVararg_0 ++] = false ? (UTF8String) null : value_33;
/* 085 */     return idxInVararg_0;
/* 086 */
/* 087 */   }
/* 088 */
/* 089 */
/* 090 */   private void apply_0_0(InternalRow i) {
/* 091 */     boolean isNull_2 = i.isNullAt(31);
/* 092 */     ArrayData value_2 = isNull_2 ?
/* 093 */     null : (i.getArray(31));boolean isNull_3 = i.isNullAt(32);
/* 094 */     ArrayData value_3 = isNull_3 ?
/* 095 */     null : (i.getArray(32));UTF8String value_4 = i.getUTF8String(1);UTF8String value_5 = i.getUTF8String(2);UTF8String value_6 = i.getUTF8String(3);UTF8String value_7 = i.getUTF8String(4);UTF8String value_8 = i.getUTF8String(5);UTF8String value_9 = i.getUTF8String(6);UTF8String value_10 = i.getUTF8String(7);UTF8String value_11 = i.getUTF8String(8);UTF8String value_12 = i.getUTF8String(9);UTF8String value_13 = i.getUTF8String(10);UTF8String value_14 = i.getUTF8String(11);UTF8String value_15 = i.getUTF8String(12);UTF8String value_16 = i.getUTF8String(13);UTF8String value_17 = i.getUTF8String(14);UTF8String value_18 = i.getUTF8String(15);UTF8String value_19 = i.getUTF8String(16);UTF8String value_20 = i.getUTF8String(17);UTF8String value_21 = i.getUTF8String(18);UTF8String value_22 = i.getUTF8String(19);UTF8String value_23 = i.getUTF8String(20);UTF8String value_24 = i.getUTF8String(21);
/* 096 */   }
/* 097 */
/* 098 */
/* 099 */   private int varargBuildsConcatWs_0_1(InternalRow i, UTF8String [] array_0, int idxInVararg_0) {
/* 100 */
/* 101 */     array_0[idxInVararg_0 ++] = false ? (UTF8String) null : value_12;array_0[idxInVararg_0 ++] = false ? (UTF8String) null : value_13;array_0[idxInVararg_0 ++] = false ? (UTF8String) null : value_14;array_0[idxInVararg_0 ++] = false ? (UTF8String) null : value_15;array_0[idxInVararg_0 ++] = false ? (UTF8String) null : value_16;array_0[idxInVararg_0 ++] = false ? (UTF8String) null : value_17;array_0[idxInVararg_0 ++] = false ? (UTF8String) null : value_18;array_0[idxInVararg_0 ++] = false ? (UTF8String) null : value_19;array_0[idxInVararg_0 ++] = false ? (UTF8String) null : value_20;array_0[idxInVararg_0 ++] = false ? (UTF8String) null : value_21;array_0[idxInVararg_0 ++] = false ? (UTF8String) null : value_22;array_0[idxInVararg_0 ++] = false ? (UTF8String) null : value_23;array_0[idxInVararg_0 ++] = false ? (UTF8String) null : value_24;array_0[idxInVararg_0 ++] = false ? (UTF8String) null : value_25;array_0[idxInVararg_0 ++] = false ? (UTF8String) null : value_26;array_0[idxInVararg_0 ++] = false ? (UTF8String) null : value_27;
/* 102 */     return idxInVararg_0;
/* 103 */
/* 104 */   }
/* 105 */
/* 106 */ }
```

Compilation of the generated code fails with error message: `org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 36, Column 6: Expression "isNull_2" is not an rvalue`

> after the patch

```
/* 001 */ public java.lang.Object generate(Object[] references) {
/* 002 */   return new SpecificUnsafeProjection(references);
/* 003 */ }
/* 004 */
/* 005 */ class SpecificUnsafeProjection extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
/* 006 */
/* 007 */   private Object[] references;
/* 008 */   private boolean globalIsNull_0;
/* 009 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[1];
/* 010 */
/* 011 */   public SpecificUnsafeProjection(Object[] references) {
/* 012 */     this.references = references;
/* 013 */
/* 014 */     mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 32);
/* 015 */
/* 016 */   }
/* 017 */
/* 018 */   public void initialize(int partitionIndex) {
/* 019 */
/* 020 */   }
/* 021 */
/* 022 */   // Scala.Function1 need this
/* 023 */   public java.lang.Object apply(java.lang.Object row) {
/* 024 */     return apply((InternalRow) row);
/* 025 */   }
/* 026 */
/* 027 */   public UnsafeRow apply(InternalRow i) {
/* 028 */     mutableStateArray_0[0].reset();
/* 029 */
/* 030 */
/* 031 */     mutableStateArray_0[0].zeroOutNullBytes();
/* 032 */
/* 033 */     UTF8String value_34 = ConcatWs_0(i);
/* 034 */     mutableStateArray_0[0].write(0, value_34);
/* 035 */     return (mutableStateArray_0[0].getRow());
/* 036 */   }
/* 037 */
/* 038 */
/* 039 */   private void initializeArgsArrays_0_0(InternalRow i, boolean [] isNullArgs_0, Object [] valueArgs_0) {
/* 040 */
/* 041 */     boolean isNull_2 = i.isNullAt(31);
/* 042 */     ArrayData value_2 = isNull_2 ?
/* 043 */     null : (i.getArray(31));
/* 044 */     isNullArgs_0[0] = isNull_2;
/* 045 */     valueArgs_0[0] = value_2;
/* 046 */
/* 047 */     boolean isNull_3 = i.isNullAt(32);
/* 048 */     ArrayData value_3 = isNull_3 ?
/* 049 */     null : (i.getArray(32));
/* 050 */     isNullArgs_0[1] = isNull_3;
/* 051 */     valueArgs_0[1] = value_3;
/* 052 */
/* 053 */     UTF8String value_4 = i.getUTF8String(1);
/* 054 */     isNullArgs_0[2] = false;
/* 055 */     valueArgs_0[2] = value_4;
/* 056 */
/* 057 */     UTF8String value_5 = i.getUTF8String(2);
/* 058 */     isNullArgs_0[3] = false;
/* 059 */     valueArgs_0[3] = value_5;
/* 060 */
/* 061 */     UTF8String value_6 = i.getUTF8String(3);
/* 062 */     isNullArgs_0[4] = false;
/* 063 */     valueArgs_0[4] = value_6;
/* 064 */
/* 065 */     UTF8String value_7 = i.getUTF8String(4);
/* 066 */     isNullArgs_0[5] = false;
/* 067 */     valueArgs_0[5] = value_7;
/* 068 */
/* 069 */     UTF8String value_8 = i.getUTF8String(5);
/* 070 */     isNullArgs_0[6] = false;
/* 071 */     valueArgs_0[6] = value_8;
/* 072 */
/* 073 */   }
/* 074 */
/* 075 */
/* 076 */   private void initializeArgsArrays_0_3(InternalRow i, boolean [] isNullArgs_0, Object [] valueArgs_0) {
/* 077 */
/* 078 */     UTF8String value_25 = i.getUTF8String(22);
/* 079 */     isNullArgs_0[23] = false;
/* 080 */     valueArgs_0[23] = value_25;
/* 081 */
/* 082 */     UTF8String value_26 = i.getUTF8String(23);
/* 083 */     isNullArgs_0[24] = false;
/* 084 */     valueArgs_0[24] = value_26;
/* 085 */
/* 086 */     UTF8String value_27 = i.getUTF8String(24);
/* 087 */     isNullArgs_0[25] = false;
/* 088 */     valueArgs_0[25] = value_27;
/* 089 */
/* 090 */     UTF8String value_28 = i.getUTF8String(25);
/* 091 */     isNullArgs_0[26] = false;
/* 092 */     valueArgs_0[26] = value_28;
/* 093 */
/* 094 */     UTF8String value_29 = i.getUTF8String(26);
/* 095 */     isNullArgs_0[27] = false;
/* 096 */     valueArgs_0[27] = value_29;
/* 097 */
/* 098 */     UTF8String value_30 = i.getUTF8String(27);
/* 099 */     isNullArgs_0[28] = false;
/* 100 */     valueArgs_0[28] = value_30;
/* 101 */
/* 102 */     UTF8String value_31 = i.getUTF8String(28);
/* 103 */     isNullArgs_0[29] = false;
/* 104 */     valueArgs_0[29] = value_31;
/* 105 */
/* 106 */     UTF8String value_32 = i.getUTF8String(29);
/* 107 */     isNullArgs_0[30] = false;
/* 108 */     valueArgs_0[30] = value_32;
/* 109 */
/* 110 */   }
/* 111 */
/* 112 */
/* 113 */   private int varargBuildsConcatWs_0_3(InternalRow i, UTF8String [] array_0, int idxInVararg_0, boolean [] isNullArgs_0, Object [] valueArgs_0) {
/* 114 */
/* 115 */     array_0[idxInVararg_0 ++] = isNullArgs_0[29] ? (UTF8String) null : ((UTF8String) valueArgs_0[29]);array_0[idxInVararg_0 ++] = isNullArgs_0[30] ? (UTF8String) null : ((UTF8String) valueArgs_0[30]);array_0[idxInVararg_0 ++] = isNullArgs_0[31] ? (UTF8String) null : ((UTF8String) valueArgs_0[31]);
/* 116 */     return idxInVararg_0;
/* 117 */
/* 118 */   }
/* 119 */
/* 120 */
/* 121 */   private int varargBuildsConcatWs_0_0(InternalRow i, UTF8String [] array_0, int idxInVararg_0, boolean [] isNullArgs_0, Object [] valueArgs_0) {
/* 122 */
/* 123 */
/* 124 */     if (!isNullArgs_0[0]) {
/* 125 */       final int n_0 = ((ArrayData) valueArgs_0[0]).numElements();
/* 126 */       for (int j = 0; j < n_0; j ++) {
/* 127 */         array_0[idxInVararg_0 ++] = ((ArrayData) valueArgs_0[0]).getUTF8String(j);
/* 128 */       }
/* 129 */     }
/* 130 */
/* 131 */     if (!isNullArgs_0[1]) {
/* 132 */       final int n_1 = ((ArrayData) valueArgs_0[1]).numElements();
/* 133 */       for (int j = 0; j < n_1; j ++) {
/* 134 */         array_0[idxInVararg_0 ++] = ((ArrayData) valueArgs_0[1]).getUTF8String(j);
/* 135 */       }
/* 136 */     }
/* 137 */     array_0[idxInVararg_0 ++] = isNullArgs_0[2] ? (UTF8String) null : ((UTF8String) valueArgs_0[2]);array_0[idxInVararg_0 ++] = isNullArgs_0[3] ? (UTF8String) null : ((UTF8String) valueArgs_0[3]);array_0[idxInVararg_0 ++] = isNullArgs_0[4] ? (UTF8String) null : ((UTF8String) valueArgs_0[4]);array_0[idxInVararg_0 ++] = isNullArgs_0[5] ? (UTF8String) null : ((UTF8String) valueArgs_0[5]);array_0[idxInVararg_0 ++] = isNullArgs_0[6] ? (UTF8String) null : ((UTF8String) valueArgs_0[6]);
/* 138 */     return idxInVararg_0;
/* 139 */
/* 140 */   }
/* 141 */
/* 142 */
/* 143 */   private UTF8String ConcatWs_0(InternalRow i) {
/* 144 */     boolean[] isNullArgs_0 = new boolean[32];
/* 145 */     Object[] valueArgs_0 = new Object[32];
/* 146 */     initializeArgsArrays_0_0(i, isNullArgs_0, valueArgs_0);
/* 147 */     initializeArgsArrays_0_1(i, isNullArgs_0, valueArgs_0);
/* 148 */     initializeArgsArrays_0_2(i, isNullArgs_0, valueArgs_0);
/* 149 */     initializeArgsArrays_0_3(i, isNullArgs_0, valueArgs_0);
/* 150 */     initializeArgsArrays_0_4(i, isNullArgs_0, valueArgs_0);
/* 151 */     int varargNum_0 = 30;
/* 152 */     int idxInVararg_0 = 0;
/* 153 */
/* 154 */     if (!isNullArgs_0[0]) {
/* 155 */       varargNum_0 += ((ArrayData) valueArgs_0[0]).numElements();
/* 156 */     }
/* 157 */
/* 158 */     if (!isNullArgs_0[1]) {
/* 159 */       varargNum_0 += ((ArrayData) valueArgs_0[1]).numElements();
/* 160 */     }
/* 161 */
/* 162 */     UTF8String[] array_0 = new UTF8String[varargNum_0];
/* 163 */     idxInVararg_0 = varargBuildsConcatWs_0_0(i, array_0, idxInVararg_0, isNullArgs_0, valueArgs_0);
/* 164 */     idxInVararg_0 = varargBuildsConcatWs_0_1(i, array_0, idxInVararg_0, isNullArgs_0, valueArgs_0);
/* 165 */     idxInVararg_0 = varargBuildsConcatWs_0_2(i, array_0, idxInVararg_0, isNullArgs_0, valueArgs_0);
/* 166 */     idxInVararg_0 = varargBuildsConcatWs_0_3(i, array_0, idxInVararg_0, isNullArgs_0, valueArgs_0);
/* 167 */     UTF8String value_0 = UTF8String.concatWs(((UTF8String) references[0] /* literal */), array_0);
/* 168 */     boolean isNull_0 = value_0 == null;
/* 169 */     globalIsNull_0 = isNull_0;
/* 170 */     return value_0;
/* 171 */   }
/* 172 */
/* 173 */
/* 174 */   private void initializeArgsArrays_0_2(InternalRow i, boolean [] isNullArgs_0, Object [] valueArgs_0) {
/* 175 */
/* 176 */     UTF8String value_17 = i.getUTF8String(14);
/* 177 */     isNullArgs_0[15] = false;
/* 178 */     valueArgs_0[15] = value_17;
/* 179 */
/* 180 */     UTF8String value_18 = i.getUTF8String(15);
/* 181 */     isNullArgs_0[16] = false;
/* 182 */     valueArgs_0[16] = value_18;
/* 183 */
/* 184 */     UTF8String value_19 = i.getUTF8String(16);
/* 185 */     isNullArgs_0[17] = false;
/* 186 */     valueArgs_0[17] = value_19;
/* 187 */
/* 188 */     UTF8String value_20 = i.getUTF8String(17);
/* 189 */     isNullArgs_0[18] = false;
/* 190 */     valueArgs_0[18] = value_20;
/* 191 */
/* 192 */     UTF8String value_21 = i.getUTF8String(18);
/* 193 */     isNullArgs_0[19] = false;
/* 194 */     valueArgs_0[19] = value_21;
/* 195 */
/* 196 */     UTF8String value_22 = i.getUTF8String(19);
/* 197 */     isNullArgs_0[20] = false;
/* 198 */     valueArgs_0[20] = value_22;
/* 199 */
/* 200 */     UTF8String value_23 = i.getUTF8String(20);
/* 201 */     isNullArgs_0[21] = false;
/* 202 */     valueArgs_0[21] = value_23;
/* 203 */
/* 204 */     UTF8String value_24 = i.getUTF8String(21);
/* 205 */     isNullArgs_0[22] = false;
/* 206 */     valueArgs_0[22] = value_24;
/* 207 */
/* 208 */   }
/* 209 */
/* 210 */
/* 211 */   private int varargBuildsConcatWs_0_2(InternalRow i, UTF8String [] array_0, int idxInVararg_0, boolean [] isNullArgs_0, Object [] valueArgs_0) {
/* 212 */
/* 213 */     array_0[idxInVararg_0 ++] = isNullArgs_0[18] ? (UTF8String) null : ((UTF8String) valueArgs_0[18]);array_0[idxInVararg_0 ++] = isNullArgs_0[19] ? (UTF8String) null : ((UTF8String) valueArgs_0[19]);array_0[idxInVararg_0 ++] = isNullArgs_0[20] ? (UTF8String) null : ((UTF8String) valueArgs_0[20]);array_0[idxInVararg_0 ++] = isNullArgs_0[21] ? (UTF8String) null : ((UTF8String) valueArgs_0[21]);array_0[idxInVararg_0 ++] = isNullArgs_0[22] ? (UTF8String) null : ((UTF8String) valueArgs_0[22]);array_0[idxInVararg_0 ++] = isNullArgs_0[23] ? (UTF8String) null : ((UTF8String) valueArgs_0[23]);array_0[idxInVararg_0 ++] = isNullArgs_0[24] ? (UTF8String) null : ((UTF8String) valueArgs_0[24]);array_0[idxInVararg_0 ++] = isNullArgs_0[25] ? (UTF8String) null : ((UTF8String) valueArgs_0[25]);array_0[idxInVararg_0 ++] = isNullArgs_0[26] ? (UTF8String) null : ((UTF8String) valueArgs_0[26]);array_0[idxInVararg_0 ++] = isNullArgs_0[27] ? (UTF8String) null : ((UTF8String) valueArgs_0[27]);array_0[idxInVararg_0 ++] = isNullArgs_0[28] ? (UTF8String) null : ((UTF8String) valueArgs_0[28]);
/* 214 */     return idxInVararg_0;
/* 215 */
/* 216 */   }
/* 217 */
/* 218 */
/* 219 */   private void initializeArgsArrays_0_4(InternalRow i, boolean [] isNullArgs_0, Object [] valueArgs_0) {
/* 220 */
/* 221 */     UTF8String value_33 = i.getUTF8String(30);
/* 222 */     isNullArgs_0[31] = false;
/* 223 */     valueArgs_0[31] = value_33;
/* 224 */
/* 225 */   }
/* 226 */
/* 227 */
/* 228 */   private void initializeArgsArrays_0_1(InternalRow i, boolean [] isNullArgs_0, Object [] valueArgs_0) {
/* 229 */
/* 230 */     UTF8String value_9 = i.getUTF8String(6);
/* 231 */     isNullArgs_0[7] = false;
/* 232 */     valueArgs_0[7] = value_9;
/* 233 */
/* 234 */     UTF8String value_10 = i.getUTF8String(7);
/* 235 */     isNullArgs_0[8] = false;
/* 236 */     valueArgs_0[8] = value_10;
/* 237 */
/* 238 */     UTF8String value_11 = i.getUTF8String(8);
/* 239 */     isNullArgs_0[9] = false;
/* 240 */     valueArgs_0[9] = value_11;
/* 241 */
/* 242 */     UTF8String value_12 = i.getUTF8String(9);
/* 243 */     isNullArgs_0[10] = false;
/* 244 */     valueArgs_0[10] = value_12;
/* 245 */
/* 246 */     UTF8String value_13 = i.getUTF8String(10);
/* 247 */     isNullArgs_0[11] = false;
/* 248 */     valueArgs_0[11] = value_13;
/* 249 */
/* 250 */     UTF8String value_14 = i.getUTF8String(11);
/* 251 */     isNullArgs_0[12] = false;
/* 252 */     valueArgs_0[12] = value_14;
/* 253 */
/* 254 */     UTF8String value_15 = i.getUTF8String(12);
/* 255 */     isNullArgs_0[13] = false;
/* 256 */     valueArgs_0[13] = value_15;
/* 257 */
/* 258 */     UTF8String value_16 = i.getUTF8String(13);
/* 259 */     isNullArgs_0[14] = false;
/* 260 */     valueArgs_0[14] = value_16;
/* 261 */
/* 262 */   }
/* 263 */
/* 264 */
/* 265 */   private int varargBuildsConcatWs_0_1(InternalRow i, UTF8String [] array_0, int idxInVararg_0, boolean [] isNullArgs_0, Object [] valueArgs_0) {
/* 266 */
/* 267 */     array_0[idxInVararg_0 ++] = isNullArgs_0[7] ? (UTF8String) null : ((UTF8String) valueArgs_0[7]);array_0[idxInVararg_0 ++] = isNullArgs_0[8] ? (UTF8String) null : ((UTF8String) valueArgs_0[8]);array_0[idxInVararg_0 ++] = isNullArgs_0[9] ? (UTF8String) null : ((UTF8String) valueArgs_0[9]);array_0[idxInVararg_0 ++] = isNullArgs_0[10] ? (UTF8String) null : ((UTF8String) valueArgs_0[10]);array_0[idxInVararg_0 ++] = isNullArgs_0[11] ? (UTF8String) null : ((UTF8String) valueArgs_0[11]);array_0[idxInVararg_0 ++] = isNullArgs_0[12] ? (UTF8String) null : ((UTF8String) valueArgs_0[12]);array_0[idxInVararg_0 ++] = isNullArgs_0[13] ? (UTF8String) null : ((UTF8String) valueArgs_0[13]);array_0[idxInVararg_0 ++] = isNullArgs_0[14] ? (UTF8String) null : ((UTF8String) valueArgs_0[14]);array_0[idxInVararg_0 ++] = isNullArgs_0[15] ? (UTF8String) null : ((UTF8String) valueArgs_0[15]);array_0[idxInVararg_0 ++] = isNullArgs_0[16] ? (UTF8String) null : ((UTF8String) valueArgs_0[16]);array_0[idxInVararg_0 ++] = isNullArgs_0[17] ? (UTF8String) null : ((UTF8String) valueArgs_0[17]);
/* 268 */     return idxInVararg_0;
/* 269 */
/* 270 */   }
/* 271 */
/* 272 */ }
```

### Why are the changes needed?

The generated code in `concat_ws` fails to compile when the below conditions are met:

* Plenty of columns are provided as input of `concat_ws`.
* There's at least one column with array[string] type. (In other words, not all columns are string type.)
* Splitting methods is triggered in `splitExpressionsWithCurrentInputs`.
  * This is a bit tricky, as the method won't split methods under whole stage codegen, as well as it will be simply no-op (inlined) if the number of blocks to convert into methods is 1.

a0187cd6b5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala (L88-L195)

There're three parts of generated code in `concat_ws` (`codes`, `varargCounts`, `varargBuilds`) and all parts try to split method by itself, while `varargCounts` and `varargBuilds` refer on the generated code in `codes`, hence the overall generated code fails to compile if any of part succeeds to split.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New UTs added. (One for verification of the patch, another one for regression test)

Closes #28831 from HeartSaVioR/SPARK-31993.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-06-19 06:01:06 +00:00
Kent Yao abc8ccc37b [SPARK-31926][SQL][TESTS][FOLLOWUP][TEST-HIVE1.2][TEST-MAVEN] Fix concurrency issue for ThriftCLIService to getPortNumber
### What changes were proposed in this pull request?

This PR brings https://github.com/apache/spark/pull/28751 back

- It once reverted by 4a25200 because of inevitable maven test failure
    - See related updates in this followup a0187cd6b5

- And reverted again because of the flakiness of the added unit tests
   - In this PR, The flakiness reason found is caused by the hive metastore connection that the SparkSQLCLIService trying to create which turns out is unnecessary at all. This metastore client points to a dummy metastore server only.
   - Also, add some cleanups for SharedThriftServer trait in before and after to prevent its configurations being polluted or polluting others

### Why are the changes needed?

fix flaky test

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

passing sbt and maven tests

Closes #28835 from yaooqinn/SPARK-31926-F.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-06-19 05:58:54 +00:00
Yuanjian Li 86b54f3321 [SPARK-31894][SS] Introduce UnsafeRow format validation for streaming state store
### What changes were proposed in this pull request?
Introduce UnsafeRow format validation for streaming state store.

### Why are the changes needed?
Currently, Structured Streaming directly puts the UnsafeRow into StateStore without any schema validation. It's a dangerous behavior when users reusing the checkpoint file during migration. Any changes or bug fix related to the aggregate function may cause random exceptions, even the wrong answer, e.g SPARK-28067.

### Does this PR introduce _any_ user-facing change?
Yes. If the underlying changes are detected when the checkpoint is reused during migration, the InvalidUnsafeRowException will be thrown.

### How was this patch tested?
UT added. Will also add integrated tests for more scenario in another PR separately.

Closes #28707 from xuanyuanking/SPARK-31894.

Lead-authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Co-authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-06-19 05:56:50 +00:00
Max Gekk 17a5007fd8 [SPARK-30865][SQL][SS] Refactor DateTimeUtils
### What changes were proposed in this pull request?

1. Move TimeZoneUTC and TimeZoneGMT to DateTimeTestUtils
2. Remove TimeZoneGMT
3. Use ZoneId.systemDefault() instead of defaultTimeZone().toZoneId
4. Alias SQLDate & SQLTimestamp to internal types of DateType and TimestampType
5. Avoid one `*` `DateTimeUtils`.`in fromJulianDay()`
6. Use toTotalMonths in `DateTimeUtils`.`subtractDates()`
7. Remove `julianCommonEraStart`, `timestampToString()`, `microsToEpochDays()`, `epochDaysToMicros()`, `instantToDays()` from `DateTimeUtils`.
8. Make splitDate() private.
9. Remove `def daysToMicros(days: Int): Long` and `def microsToDays(micros: Long): Int`.

### Why are the changes needed?

This simplifies the common code related to date-time operations, and should improve maintainability. In particular:

1. TimeZoneUTC and TimeZoneGMT are moved to DateTimeTestUtils because they are used only in tests
2. TimeZoneGMT can be removed because it is equal to TimeZoneUTC
3. After the PR #27494, Spark expressions and DateTimeUtils functions switched to ZoneId instead of TimeZone completely. `defaultTimeZone()` with `TimeZone` as return type is not needed anymore.
4. SQLDate and SQLTimestamp types can be explicitly aliased to internal types of DateType and and TimestampType instead of declaring this in a comment.
5. Avoid one `*` `DateTimeUtils`.`in fromJulianDay()`.
6. Use toTotalMonths in `DateTimeUtils`.`subtractDates()`.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
By existing test suites

Closes #27617 from MaxGekk/move-time-zone-consts.

Lead-authored-by: Max Gekk <max.gekk@gmail.com>
Co-authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-06-19 05:41:09 +00:00
Dilip Biswal e4f5036146 [SPARK-32020][SQL] Better error message when SPARK_HOME or spark.test.home is not set
### What changes were proposed in this pull request?
Better error message when SPARK_HOME or spark,test.home is not set.

### Why are the changes needed?
Currently the error message is not easily consumable as it prints  (see below) the real error after printing the current environment which is rather long.

**Old output**
`
 time.name" -> "Java(TM) SE Runtime Environment", "sun.boot.library.path" -> "/Library/Java/JavaVirtualMachines/jdk1.8.0_221.jdk/Contents/Home/jre/lib",
 "java.vm.version" -> "25.221-b11",
 . . .
 . . .
 . . .
) did not contain key "SPARK_HOME" spark.test.home or SPARK_HOME is not set.
	at org.scalatest.Assertions.newAssertionFailedExceptio
`

**New output**
An exception or error caused a run to abort: spark.test.home or SPARK_HOME is not set.
org.scalatest.exceptions.TestFailedException: spark.test.home or SPARK_HOME is not set
### Does this PR introduce any user-facing change?
`
No.

### How was this patch tested?
Ran the tests in intellej  manually to see the new error.

Closes #28825 from dilipbiswal/minor-spark-31950-followup.

Authored-by: Dilip Biswal <dkbiswal@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-06-18 22:45:55 +09:00
Max Gekk 350aa859fe [SPARK-32006][SQL] Create date/timestamp formatters once before collect in hiveResultString()
### What changes were proposed in this pull request?
1. Add method `getTimeFormatters` to `HiveResult` which creates timestamp and date formatters.
2. Move creation of `dateFormatter` and `timestampFormatter` from the constructor of the `HiveResult` object to `HiveResult. hiveResultString()` via `getTimeFormatters`. This allows to resolve time zone ID from Spark's session time zone `spark.sql.session.timeZone` and create date/timestamp formatters only once before collecting `java.sql.Timestamp`/`java.sql.Date` values.
3. Create date/timestamp formatters once in SparkExecuteStatementOperation.

### Why are the changes needed?
To fix perf regression comparing to Spark 2.4

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
- By existing test suite `HiveResultSuite` and etc.
- Re-generate benchmarks results of `DateTimeBenchmark` in the environment:

| Item | Description |
| ---- | ----|
| Region | us-west-2 (Oregon) |
| Instance | r3.xlarge |
| AMI | ubuntu/images/hvm-ssd/ubuntu-bionic-18.04-amd64-server-20190722.1 (ami-06f2f779464715dc5) |
| Java | OpenJDK 64-Bit Server VM 1.8.0_252 and OpenJDK 64-Bit Server VM 11.0.7+10 |

Closes #28842 from MaxGekk/opt-toHiveString-oss-master.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-06-17 06:28:47 +00:00
Max Gekk afd8a8b964 [SPARK-31989][SQL] Generate JSON rebasing files w/ 30 minutes step
### What changes were proposed in this pull request?
1. Change the max step from 1 week to 30 minutes in the tests `RebaseDateTimeSuite`.`generate 'gregorian-julian-rebase-micros.json'` and `generate 'julian-gregorian-rebase-micros.json'`.
2. Parallelise JSON files generation in the function `generateRebaseJson` by using `ThreadUtils.parmap`.

### Why are the changes needed?
1. To prevent the bugs that are fixed by https://github.com/apache/spark/pull/28787 and https://github.com/apache/spark/pull/28816.
2. The parallelisation speeds up JSON file generation.

### Does this PR introduce _any_ user-facing change?
Yes

### How was this patch tested?
By generating the JSON file `julian-gregorian-rebase-micros.json`.

Closes #28827 from MaxGekk/rebase-30-min.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-06-17 12:07:36 +09:00
Gabor Somogyi eeb81200e2 [SPARK-31337][SQL] Support MS SQL Kerberos login in JDBC connector
### What changes were proposed in this pull request?
When loading DataFrames from JDBC datasource with Kerberos authentication, remote executors (yarn-client/cluster etc. modes) fail to establish a connection due to lack of Kerberos ticket or ability to generate it.

This is a real issue when trying to ingest data from kerberized data sources (SQL Server, Oracle) in enterprise environment where exposing simple authentication access is not an option due to IT policy issues.

In this PR I've added MS SQL support.

What this PR contains:
* Added `MSSQLConnectionProvider`
* Added `MSSQLConnectionProviderSuite`
* Changed MS SQL JDBC driver to use the latest (test scope only)
* Changed `MsSqlServerIntegrationSuite` docker image to use the latest
* Added a version comment to `MariaDBConnectionProvider` to increase trackability

### Why are the changes needed?
Missing JDBC kerberos support.

### Does this PR introduce _any_ user-facing change?
Yes, now user is able to connect to MS SQL using kerberos.

### How was this patch tested?
* Additional + existing unit tests
* Existing integration tests
* Test on cluster manually

Closes #28635 from gaborgsomogyi/SPARK-31337.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@apache.org>
2020-06-16 18:22:12 -07:00
Takeshi Yamamuro 8d577092ed [SPARK-31705][SQL][FOLLOWUP] Avoid the unnecessary CNF computation for full-outer joins
### What changes were proposed in this pull request?

To avoid the unnecessary CNF computation for full-outer joins, this PR fixes code for filtering out full-outer joins at the entrance of the rule.

### Why are the changes needed?

To mitigate optimizer overhead.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests.

Closes #28810 from maropu/SPARK-31705.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Yuming Wang <wgyumg@gmail.com>
2020-06-16 09:13:00 -07:00
Max Gekk 36435658b1 [SPARK-31710][SQL][FOLLOWUP] Replace CAST by TIMESTAMP_SECONDS in benchmarks
### What changes were proposed in this pull request?
Replace `CAST(... AS TIMESTAMP` by `TIMESTAMP_SECONDS` in the following benchmarks:
- ExtractBenchmark
- DateTimeBenchmark
- FilterPushdownBenchmark
- InExpressionBenchmark

### Why are the changes needed?
The benchmarks fail w/o the changes:
```
[info] Running benchmark: datetime +/- interval
[info]   Running case: date + interval(m)
[error] Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'CAST(`id` AS TIMESTAMP)' due to data type mismatch: cannot cast bigint to timestamp,you can enable the casting by setting spark.sql.legacy.allowCastNumericToTimestamp to true,but we strongly recommend using function TIMESTAMP_SECONDS/TIMESTAMP_MILLIS/TIMESTAMP_MICROS instead.; line 1 pos 5;
[error] 'Project [(cast(cast(id#0L as timestamp) as date) + 1 months) AS (CAST(CAST(id AS TIMESTAMP) AS DATE) + INTERVAL '1 months')#2]
[error] +- Range (0, 10000000, step=1, splits=Some(1))
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By running the affected benchmarks.

Closes #28843 from MaxGekk/GuoPhilipse-31710-fix-compatibility-followup.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-06-16 14:07:03 +00:00
Max Gekk 6e9ff72195 [SPARK-31984][SQL] Make micros rebasing functions via local timestamps pure
### What changes were proposed in this pull request?
1. Set the given time zone as the first parameter of `RebaseDateTime`.`rebaseJulianToGregorianMicros()` and `rebaseGregorianToJulianMicros()` to Java 7 `GregorianCalendar`.
```scala
    val cal = new Calendar.Builder()
      // `gregory` is a hybrid calendar that supports both the Julian and Gregorian calendar systems
      .setCalendarType("gregory")
    ...
      .setTimeZone(tz)
      .build()
```
This makes the instance of the calendar independent from the default JVM time zone.

2. Change type of the first parameter from `ZoneId` to `TimeZone`. This allows to avoid unnecessary conversion from `TimeZone` to `ZoneId`, for example in
```scala
  def rebaseJulianToGregorianMicros(micros: Long): Long = {
    ...
      if (rebaseRecord == null || micros < rebaseRecord.switches(0)) {
        rebaseJulianToGregorianMicros(timeZone.toZoneId, micros)
```
and back to `TimeZone` inside of `rebaseJulianToGregorianMicros(zoneId: ZoneId, ...)`

3. Modify tests in `RebaseDateTimeSuite`, and set the default JVM time zone only for functions that depend on it.

### Why are the changes needed?
1. Ignoring passed parameter and using a global variable is bad practice.
2. Dependency from the global state doesn't allow to run the functions in parallel otherwise there is non-zero probability that the functions may return wrong result if the default JVM is changed during their execution.
3. This open opportunity for parallelisation of JSON files generation `gregorian-julian-rebase-micros.json` and `julian-gregorian-rebase-micros.json`. Currently, the tests `generate 'gregorian-julian-rebase-micros.json'` and `generate 'julian-gregorian-rebase-micros.json'` generate the JSON files by iterating over all time zones sequentially w/ step of 1 week. Due to the large step, we can miss some spikes in diffs between 2 calendars (Java 8 Gregorian and Java 7 hybrid calendars) as the PR https://github.com/apache/spark/pull/28787 fixed and https://github.com/apache/spark/pull/28816 should fix.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By running existing tests from `RebaseDateTimeSuite`.

Closes #28824 from MaxGekk/pure-micros-rebasing.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-06-16 12:56:27 +00:00
yangjie01 d24d27f1bc [SPARK-31997][SQL][TESTS] Drop test_udtf table when SingleSessionSuite test completed
### What changes were proposed in this pull request?
`SingleSessionSuite` not do `DROP TABLE IF EXISTS test_udtf` when test completed, then if we do mvn test `HiveThriftBinaryServerSuite`, the test case named `SPARK-11595 ADD JAR with input path having URL scheme` will FAILED because it want to re-create an exists table test_udtf.

### Why are the changes needed?
test suite shouldn't rely on their execution order

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
Manual test,mvn test SingleSessionSuite and HiveThriftBinaryServerSuite in order

Closes #28838 from LuciferYang/drop-test-table.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-06-16 19:20:44 +09:00
GuoPhilipse f0e6d0ec13 [SPARK-31710][SQL] Fail casting numeric to timestamp by default
## What changes were proposed in this pull request?
we fail casting from numeric to timestamp by default.

## Why are the changes needed?
casting from numeric to timestamp is not a  non-standard,meanwhile it may generate different result between spark and other systems,for example hive

## Does this PR introduce any user-facing change?
Yes,user cannot cast numeric to timestamp directly,user have to use the following function to achieve the same effect:TIMESTAMP_SECONDS/TIMESTAMP_MILLIS/TIMESTAMP_MICROS

## How was this patch tested?
unit test added

Closes #28593 from GuoPhilipse/31710-fix-compatibility.

Lead-authored-by: GuoPhilipse <guofei_ok@126.com>
Co-authored-by: GuoPhilipse <46367746+GuoPhilipse@users.noreply.github.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-06-16 08:35:35 +00:00
Jungtaek Lim (HeartSaVioR) fe68e95a5a [SPARK-24634][SS][FOLLOWUP] Rename the variable from "numLateInputs" to "numRowsDroppedByWatermark"
### What changes were proposed in this pull request?

This PR renames the variable from "numLateInputs" to "numRowsDroppedByWatermark" so that it becomes self-explanation.

### Why are the changes needed?

This is originated from post-review, see https://github.com/apache/spark/pull/28607#discussion_r439853232

### Does this PR introduce _any_ user-facing change?

No, as SPARK-24634 is not introduced in any release yet.

### How was this patch tested?

Existing UTs.

Closes #28828 from HeartSaVioR/SPARK-24634-v3-followup.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-06-16 16:41:08 +09:00
Max Gekk e9145d41f3 [SPARK-31986][SQL] Fix Julian-Gregorian micros rebasing of overlapping local timestamps
### What changes were proposed in this pull request?
It fixes microseconds rebasing from the hybrid calendar (Julian + Gregorian) to Proleptic Gregorian calendar in the function `RebaseDateTime`.`rebaseJulianToGregorianMicros(zoneId: ZoneId, micros: Long): Long` in the case of local timestamp overlapping.

In the case of overlapping, we look ahead of 1 day to determinate which instant we should take - earlier or later zoned timestamp. If our current standard zone and DST offsets are equal to zone offset of the next date, we choose the later timestamp otherwise the earlier one. For example, the local timestamp **1945-11-18 01:30:00.0** can be mapped to two instants (microseconds since 1970-01-01 00:00:00Z): -761211000000000 or -761207400000000. If the first one is passed to `rebaseJulianToGregorianMicros()`, we take the earlier instant in Proleptic Gregorian calendar while rebasing **1945-11-18T01:30+09:00[Asia/Hong_Kong]** otherwise the later one **1945-11-18T01:30+08:00[Asia/Hong_Kong]**.

Note: The fix assumes that only one transition of standard or DST offsets can occur during a day.

### Why are the changes needed?
Current implementation of `rebaseJulianToGregorianMicros()` handles timestamps overlapping only during daylight saving time but overlapping can happen also during transition from one standard time zone to another one. For example in the case of `Asia/Hong_Kong`, the time zone switched from `Japan Standard Time` (UTC+9) to `Hong Kong Time` (UTC+8) on _Sunday, 18 November, 1945 01:59:59 AM_. The changes allow to handle the special case as well.

### Does this PR introduce _any_ user-facing change?
There is no behaviour change for timestamps of CE after 0001-01-01. The PR might affects timestamps of BCE for which the modified `rebaseJulianToGregorianMicros()` is called directly.

### How was this patch tested?

1. By existing tests in `DateTimeUtilsSuite`, `RebaseDateTimeSuite`, `DateFunctionsSuite`, `DateExpressionsSuite` and `TimestampFormatterSuite`.

2. Added new checks to `RebaseDateTimeSuite`.`SPARK-31959: JST -> HKT at Asia/Hong_Kong in 1945`:
```scala
      assert(rebaseJulianToGregorianMicros(hkZid, rebasedEarlierMicros) === earlierMicros)
      assert(rebaseJulianToGregorianMicros(hkZid, rebasedLaterMicros) === laterMicros)
```

3. Regenerated `julian-gregorian-rebase-micros.json` with the step of 30 minutes, and got the same JSON file. The JSON file isn't affected because previously it was generated with the step of 1 week. And the spike in diffs/switch points during 1 hour of timestamp overlapping wasn't detected.

Closes #28816 from MaxGekk/fix-overlap-julian-2-grep.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-06-16 06:00:05 +00:00
Dongjoon Hyun 75afd88904 Revert "[SPARK-31926][SQL][TEST-HIVE1.2][TEST-MAVEN] Fix concurrency issue for ThriftCLIService to getPortNumber"
This reverts commit a0187cd6b5.
2020-06-15 19:04:23 -07:00
Takeshi Yamamuro 3698a14204 [SPARK-26905][SQL] Follow the SQL:2016 reserved keywords
### What changes were proposed in this pull request?

This PR intends to move keywords `ANTI`, `SEMI`, and `MINUS` from reserved to non-reserved.

### Why are the changes needed?

To comply with the ANSI/SQL standard.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added tests.

Closes #28807 from maropu/SPARK-26905-2.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2020-06-16 00:27:45 +09:00
Max Gekk eae1747b66 [SPARK-31959][SQL][TESTS][FOLLOWUP] Adopt the test "SPARK-31959: JST -> HKT at Asia/Hong_Kong in 1945" to outdated tzdb
### What changes were proposed in this pull request?
Old JDK can have outdated time zone database in which `Asia/Hong_Kong` doesn't have timestamp overlapping in 1946 at all. This PR changes the test "SPARK-31959: JST -> HKT at Asia/Hong_Kong in 1945" in `RebaseDateTimeSuite`, and makes it tolerant to the case.

### Why are the changes needed?
To fix the test failures on old JDK w/ outdated tzdb like on Jenkins machine `research-jenkins-worker-09`.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By running the test on old JDK

Closes #28832 from MaxGekk/HongKong-tz-1945-followup.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-06-15 08:09:07 -07:00
Takeshi Yamamuro 7f7b4dd519 [SPARK-31990][SS] Use toSet.toSeq in Dataset.dropDuplicates
### What changes were proposed in this pull request?

This PR partially revert SPARK-31292 in order to provide a hot-fix for a bug in `Dataset.dropDuplicates`; we must preserve the input order of `colNames` for `groupCols` because the Streaming's state store depends on the `groupCols` order.

### Why are the changes needed?

Bug fix.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added tests in `DataFrameSuite`.

Closes #28830 from maropu/SPARK-31990.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-06-15 07:48:48 -07:00
Max Gekk 9d95f1b010 [SPARK-31992][SQL] Benchmark the EXCEPTION rebase mode
### What changes were proposed in this pull request?
- Modify `DateTimeRebaseBenchmark` to benchmark the default date-time rebasing mode - `EXCEPTION` for saving/loading dates/timestamps from/to parquet files. The mode is benchmarked for modern timestamps after 1900-01-01 00:00:00Z and dates after 1582-10-15.
- Regenerate benchmark results in the environment:

| Item | Description |
| ---- | ----|
| Region | us-west-2 (Oregon) |
| Instance | r3.xlarge |
| AMI | ubuntu/images/hvm-ssd/ubuntu-bionic-18.04-amd64-server-20190722.1 (ami-06f2f779464715dc5) |
| Java | OpenJDK 64-Bit Server VM 1.8.0_252 and OpenJDK 64-Bit Server VM 11.0.7+10 |

### Why are the changes needed?
The `EXCEPTION` rebasing mode is the default mode of the SQL configs `spark.sql.legacy.parquet.datetimeRebaseModeInRead` and `spark.sql.legacy.parquet.datetimeRebaseModeInWrite`. The changes are needed to improve benchmark coverage for default settings.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By running the benchmark and check results manually.

Closes #28829 from MaxGekk/benchmark-exception-mode.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-06-15 07:25:56 +00:00
Kent Yao a0187cd6b5 [SPARK-31926][SQL][TEST-HIVE1.2][TEST-MAVEN] Fix concurrency issue for ThriftCLIService to getPortNumber
### What changes were proposed in this pull request?

This PR brings 02f32cfae4 back which reverted by 4a25200cd7 because of maven test failure

diffs newly made:
1. add a missing log4j file to test resources
2. Call `SessionState.detachSession()` to clean the thread local one in `afterAll`.
3. Not use dedicated JVMs for sbt test runner too

### Why are the changes needed?

fix the maven test

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

add new tests

Closes #28797 from yaooqinn/SPARK-31926-NEW.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-06-15 06:10:24 +00:00
Liang-Chi Hsieh 8282bbf12d [SPARK-27633][SQL] Remove redundant aliases in NestedColumnAliasing
## What changes were proposed in this pull request?

In NestedColumnAliasing rule, we create aliases for nested field access in project list. We considered that top level parent field and nested fields under it were both accessed. In the case, we don't create the aliases because they are redundant.

There is another case, where a nested parent field and nested fields under it were both accessed, which we don't consider now. We don't need to create aliases in this case too.

## How was this patch tested?

Added test.

Closes #24525 from viirya/SPARK-27633.

Lead-authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Co-authored-by: Liang-Chi Hsieh <liangchi@uber.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-06-15 11:01:56 +09:00