Commit graph

6532 commits

Author SHA1 Message Date
Liang-Chi Hsieh 160c144baa [SPARK-30590][SQL] Untyped select API cannot take typed column expression that needs input type
### What changes were proposed in this pull request?

This patch proposes to throw clear analysis exception if untyped `Dataset.select` takes typed column expression that needs input type.

### Why are the changes needed?

`Dataset` provides few typed `select` helper functions to select typed column expressions. The maximum number of typed columns supported is 5. If wanting to select more than 5 typed columns, it silently calls untyped `Dataset.select` and can causes weird unresolved error, like:

```
org.apache.spark.sql.AnalysisException: unresolved operator 'Aggregate [fooagg(FooAgg(1), None, None, None, input[0, int, false] AS value#114, assertnotnull(cast(value#114 as int)), input[0, int, false] AS value#113, IntegerType, IntegerType, false) AS foo_agg_1#116, fooagg(FooAgg(2), None, None, None, input[0, int, false] AS value#119, assertnotnull(cast(value#119 as int)), input[0, int, false] AS value#118, IntegerType, IntegerType, false) AS foo_agg_2#121, fooagg(FooAgg(3), None, None, None, input[0, int, false] AS value#124, assertnotnull(cast(value#124 as int)), input[0, int, false] AS value#123, IntegerType, IntegerType, false) AS foo_agg_3#126, fooagg(FooAgg(4), None, None, None, input[0, int, false] AS value#129, assertnotnull(cast(value#129 as int)), input[0, int, false] AS value#128, IntegerType, IntegerType, false) AS foo_agg_4#131, fooagg(FooAgg(5), None, None, None, input[0, int, false] AS value#134, assertnotnull(cast(value#134 as int)), input[0, int, false] AS value#133, IntegerType, IntegerType, false) AS foo_agg_5#136, fooagg(FooAgg(6), None, None, None, input[0, int, false] AS value#139, assertnotnull(cast(value#139 as int)), input[0, int, false] AS value#138, IntegerType, IntegerType, false) AS foo_agg_6#141];;
'Aggregate [fooagg(FooAgg(1), None, None, None, input[0, int, false] AS value#114, assertnotnull(cast(value#114 as int)), input[0, int, false] AS value#113, IntegerType, IntegerType, false) AS foo_agg_1#116, fooagg(FooAgg(2), None, None, None, input[0, int, false] AS value#119, assertnotnull(cast(value#119 as int)), input[0, int, false] AS value#118, IntegerType, IntegerType, false) AS foo_agg_2#121, fooagg(FooAgg(3), None, None, None, input[0, int, false] AS value#124, assertnotnull(cast(value#124 as int)), input[0, int, false] AS value#123, IntegerType, IntegerType, false) AS foo_agg_3#126, fooagg(FooAgg(4), None, None, None, input[0, int, false] AS value#129, assertnotnull(cast(value#129 as int)), input[0, int, false] AS value#128, IntegerType, IntegerType, false) AS foo_agg_4#131, fooagg(FooAgg(5), None, None, None, input[0, int, false] AS value#134, assertnotnull(cast(value#134 as int)), input[0, int, false] AS value#133, IntegerType, IntegerType, false) AS foo_agg_5#136, fooagg(FooAgg(6), None, None, None, input[0, int, false] AS value#139, assertnotnull(cast(value#139 as int)), input[0, int, false] AS value#138, IntegerType, IntegerType, false) AS foo_agg_6#141]
+- Project [_1#6 AS a#13, _2#7 AS b#14, _3#8 AS c#15, _4#9 AS d#16, _5#10 AS e#17, _6#11 AS F#18]
 +- LocalRelation [_1#6, _2#7, _3#8, _4#9, _5#10, _6#11]

at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:43)
 at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:95)
 at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$3.apply(CheckAnalysis.scala:431)
 at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$3.apply(CheckAnalysis.scala:430)
 at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
 at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:430)
```

However, to fully disallow typed columns as input to untyped `select` API will break current usage like `count` that is a `TypedColumn` in `functions`. In order to keep compatibility, we should allow current usage of certain `TypedColumn`s as input to untyped `select` API. For the `TypedColumn`s that will cause unresolved exception, we should explicitly let users know that they are incorrectly calling untyped `select` with typed columns which need input type.

### Does this PR introduce any user-facing change?

Yes, but this PR only refines the error message.

When users call `Dataset.select` API with typed column that needs input type, an analysis exception will be thrown. Previously an unresolved error will be thrown.

### How was this patch tested?

Unit tests.

Closes #27499 from viirya/SPARK-30590.

Lead-authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Co-authored-by: Liang-Chi Hsieh <liangchi@uber.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-02-27 14:09:07 +08:00
Wenchen Fan eced93215f [SPARK-30918][SQL][FOLLOWUP] Fix typo in OptimizeSkewedJoin
### What changes were proposed in this pull request?

This is a follow up of #27669 in order to fix a typo.

### Why are the changes needed?

N/A

### Does this PR introduce any user-facing change?

no

### How was this patch tested?

N/A

Closes #27714 from cloud-fan/typo.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2020-02-26 13:59:43 -08:00
iRakson c913b9d8b5 [SPARK-27619][SQL] MapType should be prohibited in hash expressions
### What changes were proposed in this pull request?
`hash()` and `xxhash64()` cannot be used on elements of `Maptype`. A new configuration `spark.sql.legacy.useHashOnMapType` is introduced to allow users to restore the previous behaviour.

When `spark.sql.legacy.useHashOnMapType` is set to false:

```
scala> spark.sql("select hash(map())");
org.apache.spark.sql.AnalysisException: cannot resolve 'hash(map())' due to data type mismatch: input to function hash cannot contain elements of MapType; line 1 pos 7;
'Project [unresolvedalias(hash(map(), 42), None)]
+- OneRowRelation
```

when `spark.sql.legacy.useHashOnMapType` is set to true :

```
scala> spark.sql("set spark.sql.legacy.useHashOnMapType=true");
res3: org.apache.spark.sql.DataFrame = [key: string, value: string]

scala> spark.sql("select hash(map())").first()
res4: org.apache.spark.sql.Row = [42]

```

### Why are the changes needed?

As discussed in Jira, SparkSql's map hashcodes depends on their order of insertion which is not consistent with the normal scala behaviour which might confuse users.
Code snippet from JIRA :
```
val a = spark.createDataset(Map(1->1, 2->2) :: Nil)
val b = spark.createDataset(Map(2->2, 1->1) :: Nil)

// Demonstration of how Scala Map equality is unaffected by insertion order:
assert(Map(1->1, 2->2).hashCode() == Map(2->2, 1->1).hashCode())
assert(Map(1->1, 2->2) == Map(2->2, 1->1))
assert(a.first() == b.first())

// In contrast, this will print two different hashcodes:
println(Seq(a, b).map(_.selectExpr("hash(*)").first()))
```

Also `MapType` is prohibited for aggregation / joins / equality comparisons #7819 and set operations #17236.

### Does this PR introduce any user-facing change?
Yes. Now users cannot use hash functions on elements of `mapType`. To restore the previous behaviour set `spark.sql.legacy.useHashOnMapType` to true.

### How was this patch tested?
UT added.

Closes #27580 from iRakson/SPARK-27619.

Authored-by: iRakson <raksonrakesh@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-02-27 01:48:12 +08:00
Terry Kim 73305475c1 [SPARK-30782][SQL] Column resolution doesn't respect current catalog/namespace for v2 tables
### What changes were proposed in this pull request?

This PR proposes to fix an issue where qualified columns are not matched for v2 tables if current catalog/namespace are used.

For v1 tables, you can currently perform the following:
```SQL
SELECT default.t.id FROM t;
```

For v2 tables, the following fails:
```SQL
USE testcat.ns1.ns2;
SELECT testcat.ns1.ns2.t.id FROM t;

org.apache.spark.sql.AnalysisException: cannot resolve '`testcat.ns1.ns2.t.id`' given input columns: [t.id, t.point]; line 1 pos 7;
```

### Why are the changes needed?

It is a bug since qualified column names cannot match if current catalog/namespace are used.

### Does this PR introduce any user-facing change?

Yes, now the following works:
```SQL
USE testcat.ns1.ns2;
SELECT testcat.ns1.ns2.t.id FROM t;
```

### How was this patch tested?

Added new tests

Closes #27532 from imback82/qualifed_col_respect_current.

Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-02-27 00:21:38 +08:00
gatorsmile 28b8713036 [SPARK-30950][BUILD] Setting version to 3.1.0-SNAPSHOT
### What changes were proposed in this pull request?
This patch is to bump the master branch version to 3.1.0-SNAPSHOT.

### Why are the changes needed?
N/A

### Does this PR introduce any user-facing change?
N/A

### How was this patch tested?
N/A

Closes #27698 from gatorsmile/updateVersion.

Authored-by: gatorsmile <gatorsmile@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2020-02-25 19:44:31 -08:00
Jungtaek Lim (HeartSaVioR) 9ea6c0a897
[SPARK-30943][SS] Show "batch ID" in tool tip string for Structured Streaming UI graphs
### What changes were proposed in this pull request?

This patch changes the tool tip string in Structured Streaming UI graphs to show batch ID (and timestamp as well) instead of only showing timestamp, which was a key for DStream but no longer a key for Structured Streaming.

This patch does some refactoring as there're some spots on confusion between js file for streaming and structured streaming.

Note that this patch doesn't actually change the x axis, as once we change it we should decouple the logic for graphs between streaming and structured streaming. It won't change UX meaningfully as in x axis we only show min and max which we still would like to know about "time" as well as batch ID.

### Why are the changes needed?

In Structured Streaming, everything is aligned for "batch ID" where the UI is only showing timestamp - end users have to manually find and correlate batch ID and the timestamp which is clearly a huge pain.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

Manually tested. Screenshots:

![Screen Shot 2020-02-25 at 7 22 38 AM](https://user-images.githubusercontent.com/1317309/75197701-40b2ce80-57a2-11ea-9578-c2eb2d1091de.png)
![Screen Shot 2020-02-25 at 7 22 44 AM](https://user-images.githubusercontent.com/1317309/75197704-427c9200-57a2-11ea-9439-e0a8303d0860.png)
![Screen Shot 2020-02-25 at 7 22 58 AM](https://user-images.githubusercontent.com/1317309/75197706-43152880-57a2-11ea-9617-1276c3ba181e.png)
![Screen Shot 2020-02-25 at 7 23 04 AM](https://user-images.githubusercontent.com/1317309/75197708-43152880-57a2-11ea-9de2-7d37eaf88102.png)
![Screen Shot 2020-02-25 at 7 23 31 AM](https://user-images.githubusercontent.com/1317309/75197710-43adbf00-57a2-11ea-9ae4-4e292de39c36.png)

Closes #27687 from HeartSaVioR/SPARK-30943.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
2020-02-25 15:29:36 -08:00
Wenchen Fan 8f247e5d36 [SPARK-30918][SQL] improve the splitting of skewed partitions
### What changes were proposed in this pull request?

Use the average size of the non-skewed partitions as the target size when splitting skewed partitions, instead of ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD

### Why are the changes needed?

The goal of skew join optimization is to make the data distribution move even. So it makes more sense the use the average size of the non-skewed partitions as the target size.

### Does this PR introduce any user-facing change?

no

### How was this patch tested?

existing tests

Closes #27669 from cloud-fan/aqe.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Xiao Li <gatorsmile@gmail.com>
2020-02-25 14:10:29 -08:00
Maxim Gekk ffc0935e64 [SPARK-30869][SQL] Convert dates to/from timestamps in microseconds precision
### What changes were proposed in this pull request?
In the PR, I propose to replace:

1. `millisToDays()` by `microsToDays()` which accepts microseconds since the epoch and returns days since the epoch in the specified time zone. The last one is the internal representation of Catalyst's DateType.
2. `daysToMillis()` by `daysToMicros()` which accepts days since the epoch in some time zone and returns the number of microseconds since the epoch. The last one is internal representation of Catalyst's TimestampType.
3. `fromMillis()` by `millisToMicros()`
4. `toMillis()` by `microsToMillis()`

### Why are the changes needed?
Spark stores timestamps in microseconds precision, so, there is no actual need to convert dates to milliseconds, and then to microseconds. As examples, look at DateTimeUtils functions `monthsBetween()` and `truncTimestamp()`.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
By existing test suites UnivocityParserSuite, DateExpressionsSuite, ComputeCurrentTimeSuite, DateTimeUtilsSuite, DateFunctionsSuite, JsonSuite, StreamSuite.

Closes #27618 from MaxGekk/replace-millis-by-micros.

Authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-02-25 23:05:28 +08:00
Kent Yao 761209c1f2 [SPARK-30919][SQL] Make interval multiply and divide's overflow behavior consistent with other operations
### What changes were proposed in this pull request?

The current behavior of interval multiply and divide follows the ANSI SQL standard when overflow, it is compatible with other operations when `spark.sql.ansi.enabled` is true, but not compatible when `spark.sql.ansi.enabled` is false.

When `spark.sql.ansi.enabled` is false, as the factor is a double value, so it should use java's rounding or truncation behavior for casting double to integrals. when divided by zero, it returns `null`.  we also follow the natural rules for intervals as defined in the Gregorian calendar, so we do not add the month fraction to days but add days fraction to microseconds.

### Why are the changes needed?

Make interval multiply and divide's overflow behavior consistent with other interval operations

### Does this PR introduce any user-facing change?

no, these are new features in 3.0

### How was this patch tested?

add uts

Closes #27672 from yaooqinn/SPARK-30919.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-02-25 22:19:24 +08:00
Yuanjian Li e45f2c7fc0 [SPARK-28228][SQL][TESTS] Refactoring for nested CTE tests
### What changes were proposed in this pull request?
Split the nested CTE cases into a single file `cte-nested.sql`, which will be reused in cte-legacy.sql and cte-nonlegacy.sql.

### Why are the changes needed?
Make the cases easy to maintain.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Existing UT.

Closes #27667 from xuanyuanking/SPARK-28228-test.

Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-02-25 17:37:34 +09:00
Terry Kim 0fd4fa70c8 [SPARK-30885][SQL] V1 table name should be fully qualified if catalog name is provided
### What changes were proposed in this pull request?

For the following:
```
CREATE TABLE t USING json AS SELECT 1 AS i
SELECT * FROM spark_catalog.t
```
`spark_catalog.t` is resolved to `spark_catalog.default.t` assuming the current namespace is `default`. However, this is not consistent with V2 behavior where the namespace must be specified if the catalog name is provided. This PR proposes to fix this inconsistency.

### Why are the changes needed?

To be consistent with V2 table naming scheme in SQL commands.

### Does this PR introduce any user-facing change?

Yes, now the user has to specify the namespace if the catalog name is provided. For example,
```
SELECT * FROM spark_catalog.t # Will throw AnalysisException with 'Session catalog cannot have an empty namespace: spark_catalog.t'
SELECT * FROM spark_catalog.default.t # OK
```

### How was this patch tested?

Added new tests

Closes #27642 from imback82/disallow_spark_catalog_wihtout_db.

Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-02-25 13:04:28 +08:00
Shixiong Zhu 3126557b07 [SPARK-30936][CORE] Set FAIL_ON_UNKNOWN_PROPERTIES to false by default to parse Spark events
### What changes were proposed in this pull request?

Set `FAIL_ON_UNKNOWN_PROPERTIES` to `false` in `JsonProtocol` to allow ignore unknown fields in a Spark event. After this change, if we add new fields to a Spark event parsed by `ObjectMapper`, the event json string generated by a new Spark version can still be read by an old Spark History Server.

Since Spark History Server is an extra service, it usually takes time to upgrade, and it's possible that a Spark application is upgraded before SHS. Forwards-compatibility will allow an old SHS to support new Spark applications (may lose some new features but most of functions should still work).

### Why are the changes needed?

`JsonProtocol` is supposed to provide strong backwards-compatibility and forwards-compatibility guarantees: any version of Spark should be able to read JSON output written by any other version, including newer versions.

However, the forwards-compatibility guarantee is broken for events parsed by `ObjectMapper`. If a new field is added to an event parsed by `ObjectMapper` (e.g., 6dc5921e66 (diff-dc5c7a41fbb7479cef48b67eb41ad254R33)), the event json string generated by a new Spark version cannot be parsed by an old version of SHS right now.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

The new added tests.

Closes #27680 from zsxwing/SPARK-30936.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-02-25 12:28:31 +08:00
Peter Toth 1a4e2423b2 [SPARK-30870][SQL] Column pruning shouldn't alias a nested column if it means the whole structure
### What changes were proposed in this pull request?
This PR fixes a bug in nested column aliasing by taking the data type of the referenced nested fields into account when calculating the number of extracted columns. After this PR this query runs without issues:
```
SELECT explodedvalue.*
FROM VALUES array(named_struct('nested', named_struct('a', 1, 'b', 2))) AS (value)
LATERAL VIEW explode(value) AS explodedvalue
```
This is a regression from Spark 2.4.

### Why are the changes needed?
To fix a bug.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Added new UT.

Closes #27675 from peter-toth/SPARK-30870.

Authored-by: Peter Toth <peter.toth@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2020-02-24 13:46:21 -08:00
Shixiong Zhu 293e5364e5 [SPARK-30927][SS] StreamingQueryManager should avoid keeping reference to terminated StreamingQuery
### What changes were proposed in this pull request?

Right now `StreamingQueryManager` will keep the last terminated query until `resetTerminated` is called. When the last terminated query has lots of states (a large sql plan, cached RDDs, etc.), it will keep a lot of memory unnecessarily. Actually, what `StreamingQueryManager` really needs is just the exception of the last failed query.

This PR changes the internal field `lastTerminatedQuery` in `StreamingQueryManager` to remember the last exception rather than the query to save the memory.

### Why are the changes needed?

Avoid keeping memory unnecessarily.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

This PR doesn't change any public behaviors. The existing tests have covered the touched codes.

Closes #27678 from zsxwing/SPARK-30927.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-02-24 18:48:19 +09:00
beliefer 621e37e2ab [SPARK-28880][SQL] Support ANSI nested bracketed comments
### What changes were proposed in this pull request?
Spark SQL support single comments and bracketed comments now. This PR will support nested bracketed comments.

There are some mainstream database support the syntax.
**PostgreSQL:**
https://www.postgresql.org/docs/11/sql-syntax-lexical.html#SQL-SYNTAX-COMMENTS

**Vertica:**
https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/SQLReferenceManual/LanguageElements/Expressions/Comments.htm?zoom_highlight=comments

Note: Because Spark SQL not exists UT for single comments and bracketed comments, so I add some UT for them.

### Why are the changes needed?
nested bracketed comments is ANSI standard.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
New UT

Closes #27495 from beliefer/nested-brancket-comments.

Authored-by: beliefer <beliefer@163.com>
Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com>
2020-02-24 00:28:46 -08:00
Burak Yavuz 4ff2718d54 [SPARK-30924][SQL][3.0] Add additional checks to Merge Into
### What changes were proposed in this pull request?

Merge Into is currently missing additional validation around:

 1. The lack of any WHEN statements
 2. The first WHEN MATCHED statement needs to have a condition if there are two WHEN MATCHED statements.
 3. Single use of UPDATE/DELETE

This PR introduces these validations.
(1) is required, because otherwise the MERGE statement is useless.
(2) is required, because otherwise the second WHEN MATCHED condition becomes dead code
(3) is up for debate, but the idea there is that a single expression should be sufficient to specify when you would like to update or delete your records. We restrict it for now to reduce surface area and ambiguity.

### Why are the changes needed?

To ease DataSource developers when building implementations for MERGE

### Does this PR introduce any user-facing change?

Adds additional validation checks

### How was this patch tested?

Unit tests

Closes #27677 from brkyvz/mergeChecks.

Authored-by: Burak Yavuz <brkyvz@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-02-24 15:16:37 +08:00
jiake f4696ba252 [SPARK-30922][SQL] remove the max splits config in skewed join
### What changes were proposed in this pull request?
When skewed join optimization split more skewed readers, the plan may be very large and can not be shown in ui quickly. The config `spark.sql.adaptive.skewedJoinOptimization.skewedPartitionMaxSplits`  is to resolve the above ui shown issue. And after [PR#27493](https://github.com/apache/spark/pull/27493) combined the skewed readers into one, we not need this config.

### Why are the changes needed?
remove the unnecessary config

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
existing test

Closes #27673 from JkSelf/removeMaxSplitNum.

Authored-by: jiake <ke.a.jia@intel.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-02-24 14:29:25 +08:00
Maxim Gekk c41ef39819 [SPARK-30925][SQL] Prevent overflow/round errors in conversions of milliseconds to/from microseconds
### What changes were proposed in this pull request?
- Use `Math.multiplyExact()` in `DateTimeUtils.fromMillis()` to prevent silent overflow in conversion milliseconds to microseconds.
- Use `DateTimeUtils.fromMillis()` in all places where milliseconds are converted to microseconds
- Use `DateTimeUtils.toMillis()` in all places where microseconds are converted to milliseconds

### Why are the changes needed?

1. To prevent silent arithmetic overflow while multiplying by 1000 in `fromMillis()`. Instead of it, `new ArithmeticException("long overflow")` will be thrown, and handled accordantly.
2. To correctly round microseconds in conversion to milliseconds. For example, `1965-01-01 10:11:12.123456` is represented as `-157700927876544` in micro precision. In milliseconds precision the above needs to be represented as `-157700927877` or `1965-01-01 10:11:12.123`.

### Does this PR introduce any user-facing change?
Yes

### How was this patch tested?
By `TimestampFormatterSuite`, `CastSuite`, `DateExpressionsSuite`, `IntervalExpressionsSuite`, `ExpressionParserSuite`, `ExpressionParserSuite`, `DateTimeUtilsSuite`, `IntervalUtilsSuite`

Closes #27676 from MaxGekk/millis-2-micros-overflow.

Authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-02-24 14:06:25 +08:00
yi.wu 9c2eadc726 [SPARK-30844][SQL] Static partition should also follow StoreAssignmentPolicy when insert into table
### What changes were proposed in this pull request?

Make static partition also follows `StoreAssignmentPolicy` when insert into table:

if `StoreAssignmentPolicy=LEGACY`, using `Cast`;
if `StoreAssignmentPolicy=ANSI | STRIC`, using `AnsiCast`;

E.g., for the table `t` created by:

```
create table t(a int, b string) using parquet partitioned by (a)
```
and insert values with `StoreAssignmentPolicy=ANSI` using:
```
insert into t partition(a='ansi') values('ansi')
```

Before this PR:

```
+----+----+
|   b|   a|
+----+----+
|ansi|null|
+----+----+
```

After this PR, insert will fail by:
```
java.lang.NumberFormatException: invalid input syntax for type numeric: ansi
```

(It should be better if we could use `TableOutputResolver.checkField` to fully follow `StoreAssignmentPolicy`. But since we lost the data type of static partition's value at first place, it's hard to use `TableOutputResolver.checkField`.)

### Why are the changes needed?

I think we should follow `StoreAssignmentPolicy` when insert into table for any columns, including static partition.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Added new test.

Closes #27597 from Ngone51/fix-static-partition.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2020-02-23 17:46:19 +09:00
yi.wu 25f5bfaa6e [SPARK-30903][SQL] Fail fast on duplicate columns when analyze columns
<!--
Thanks for sending a pull request!  Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
  2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
  4. Be sure to keep the PR description updated to reflect all changes.
  5. Please write your PR title to summarize what this PR proposes.
  6. If possible, provide a concise example to reproduce the issue for a faster review.
  7. If you want to add a new configuration, please read the guideline first for naming configurations in
     'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
-->

### What changes were proposed in this pull request?
<!--
Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue.
If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
  1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
  2. If you fix some SQL features, you can provide some references of other DBMSes.
  3. If there is design documentation, please add the link.
  4. If there is a discussion in the mailing list, please add the link.
-->

Add new `CommandCheck` rule and fail fast when detects duplicate columns in `AnalyzeColumnCommand`.

### Why are the changes needed?
<!--
Please clarify why the changes are needed. For instance,
  1. If you propose a new API, clarify the use case for a new API.
  2. If you fix a bug, you can clarify why it is a bug.
-->

To avoid duplicate statistics computation for the same column in `AnalyzeColumnCommand`.

### Does this PR introduce any user-facing change?
<!--
If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
If no, write 'No'.
-->

Yes. User now get exception when input duplicate columns.

### How was this patch tested?
<!--
If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
If tests were not added, please describe why they were not added and/or why it was difficult to add.
-->

Added new test.

Closes #27651 from Ngone51/fail_on_dup_cols.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2020-02-23 09:52:54 +09:00
beliefer 59d6d5cbb0 [SPARK-30840][CORE][SQL] Add version property for ConfigEntry and ConfigBuilder
### What changes were proposed in this pull request?
Spark `ConfigEntry` and `ConfigBuilder` missing Spark version information of each configuration at release. This is not good for Spark user when they visiting the page of spark configuration.
http://spark.apache.org/docs/latest/configuration.html
The new Spark SQL config docs looks like:
![sql配置截屏](https://user-images.githubusercontent.com/8486025/74604522-cb882f00-50f9-11ea-8683-57a90f9e3347.png)

```
> SET -v
spark.sql.adaptive.enabled      false   When true, enable adaptive query execution.
spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin       0.2     The relation with a non-empty partition ratio lower than this config will not be considered as the build side of a broadcast-hash join in adaptive execution regardless of its size.This configuration only has an effect when 'spark.sql.adaptive.enabled' is enabled.
spark.sql.adaptive.optimizeSkewedJoin.enabled   true    When true and adaptive execution is enabled, a skewed join is automatically handled at runtime.
spark.sql.adaptive.optimizeSkewedJoin.skewedPartitionFactor     10      A partition is considered as a skewed partition if its size is larger than this factor multiple the median partition size and also larger than  spark.sql.adaptive.optimizeSkewedJoin.skewedPartitionSizeThreshold
spark.sql.adaptive.optimizeSkewedJoin.skewedPartitionMaxSplits  5       Configures the maximum number of task to handle a skewed partition in adaptive skewedjoin.
spark.sql.adaptive.optimizeSkewedJoin.skewedPartitionSizeThreshold      64MB    Configures the minimum size in bytes for a partition that is considered as a skewed partition in adaptive skewed join.
spark.sql.adaptive.shuffle.fetchShuffleBlocksInBatch.enabled    true    Whether to fetch the continuous shuffle blocks in batch. Instead of fetching blocks one by one, fetching continuous shuffle blocks for the same map task in batch can reduce IO and improve performance. Note, multiple continuous blocks exist in single fetch request only happen when 'spark.sql.adaptive.enabled' and 'spark.sql.adaptive.shuffle.reducePostShufflePartitions.enabled' is enabled, this feature also depends on a relocatable serializer, the concatenation support codec in use and the new version shuffle fetch protocol.
spark.sql.adaptive.shuffle.localShuffleReader.enabled   true    When true and 'spark.sql.adaptive.enabled' is enabled, this enables the optimization of converting the shuffle reader to local shuffle reader for the shuffle exchange of the broadcast hash join in probe side.
spark.sql.adaptive.shuffle.maxNumPostShufflePartitions  <undefined>     The advisory maximum number of post-shuffle partitions used in adaptive execution. This is used as the initial number of pre-shuffle partitions. By default it equals to spark.sql.shuffle.partitions. This configuration only has an effect when 'spark.sql.adaptive.enabled' and 'spark.sql.adaptive.shuffle.reducePostShufflePartitions.enabled' is enabled.
```

**Note**: Because there are so many configuration items that are exposed and require a lot of finishing, I will add the version numbers of these configuration items in another PR.

### Why are the changes needed?
Supplemental configuration version information.

### Does this PR introduce any user-facing change?
Yes

### How was this patch tested?
Exists UT

Closes #27592 from beliefer/add-version-to-config.

Authored-by: beliefer <beliefer@163.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-02-22 09:46:42 +09:00
Eric Wu 1f0300fb16 [SPARK-30764][SQL] Improve the readability of EXPLAIN FORMATTED style
### What changes were proposed in this pull request?
The style of `EXPLAIN FORMATTED` output needs to be improved. We’ve already got some observations/ideas in
https://github.com/apache/spark/pull/27368#discussion_r376694496
https://github.com/apache/spark/pull/27368#discussion_r376927143

Observations/Ideas:
1. Using comma as the separator is not clear, especially commas are used inside the expressions too.
2. Show the column counts first? For example, `Results [4]: …`
3. Currently the attribute names are automatically generated, this need to refined.
4. Add arguments field in common implementations as `EXPLAIN EXTENDED` did by calling `argString` in `TreeNode.simpleString`. This will eliminate most existing minor differences between
`EXPLAIN EXTENDED` and `EXPLAIN FORMATTED`.
5. Another improvement we can do is: the generated alias shouldn't include attribute id. collect_set(val, 0, 0)#123 looks clearer than collect_set(val#456, 0, 0)#123

This PR is currently addressing comments 2 & 4, and open for more discussions on improving readability.

### Why are the changes needed?
The readability of `EXPLAIN FORMATTED` need to be improved, which will help user better understand the query plan.

### Does this PR introduce any user-facing change?
Yes, `EXPLAIN FORMATTED` output style changed.

### How was this patch tested?
Update expect results of test cases in explain.sql

Closes #27509 from Eric5553/ExplainFormattedRefine.

Authored-by: Eric Wu <492960551@qq.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-02-21 23:36:14 +08:00
maryannxue 6058ce97b9 [SPARK-30906][SQL] Turning off AQE in CacheManager is not thread-safe
### What changes were proposed in this pull request?
This PR aims to fix the thread-safety issue in turning off AQE for CacheManager by cloning the current session and changing the AQE conf on the cloned session.
This PR also adds a utility function for cloning the session with AQE disabled conf value, which can be shared by another caller.

### Why are the changes needed?
To fix the potential thread-unsafe problem.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Manually tested CachedTableSuite with AQE settings enabled.

Closes #27659 from maryannxue/spark-30906.

Authored-by: maryannxue <maryannxue@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-02-21 22:49:20 +08:00
Yuanjian Li a5efbb284e [SPARK-30809][SQL] Review and fix issues in SQL API docs
### What changes were proposed in this pull request?
- Add missing `since` annotation.
- Don't show classes under `org.apache.spark.sql.dynamicpruning` package in API docs.
- Fix the scope of `xxxExactNumeric` to remove it from the API docs.

### Why are the changes needed?
Avoid leaking APIs unintentionally in Spark 3.0.0.

### Does this PR introduce any user-facing change?
No. All these changes are to avoid leaking APIs unintentionally in Spark 3.0.0.

### How was this patch tested?
Manually generated the API docs and verified the above issues have been fixed.

Closes #27560 from xuanyuanking/SPARK-30809.

Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-02-21 17:03:22 +08:00
yi.wu 82ce4753aa [SPARK-26580][SQL][ML][FOLLOW-UP] Throw exception when use untyped UDF by default
### What changes were proposed in this pull request?

This PR proposes to throw exception by default when user use untyped UDF(a.k.a `org.apache.spark.sql.functions.udf(AnyRef, DataType)`).

And user could still use it by setting `spark.sql.legacy.useUnTypedUdf.enabled` to `true`.

### Why are the changes needed?

According to #23498, since Spark 3.0, the untyped UDF will return the default value of the Java type if the input value is null. For example, `val f = udf((x: Int) => x, IntegerType)`, `f($"x")` will  return 0 in Spark 3.0 but null in Spark 2.4. And the behavior change is introduced due to Spark3.0 is built with Scala 2.12 by default.

As a result, this might change data silently and may cause correctness issue if user still expect `null` in some cases. Thus, we'd better to encourage user to use typed UDF to avoid this problem.

### Does this PR introduce any user-facing change?

Yeah. User will hit exception now when use untyped UDF.

### How was this patch tested?

Added test and updated some tests.

Closes #27488 from Ngone51/spark_26580_followup.

Lead-authored-by: yi.wu <yi.wu@databricks.com>
Co-authored-by: wuyi <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-02-21 14:46:54 +08:00
wuyi 5eb004f4bb Revert "[SPARK-28093][SQL] Fix TRIM/LTRIM/RTRIM function parameter order issue"
### What changes were proposed in this pull request?

This reverts commit bef5d9d6c3.

### Why are the changes needed?

Revert it according to https://github.com/apache/spark/pull/24902#issuecomment-584511167.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Pass Jenkins.

Closes #27540 from Ngone51/revert_spark_28093.

Lead-authored-by: wuyi <yi.wu@databricks.com>
Co-authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-02-21 12:55:32 +08:00
Maxim Gekk a551715fd2 [SPARK-29930][SPARK-30416][SQL][FOLLOWUP] Move deprecated/removed config checks from RuntimeConfig to SQLConf
### What changes were proposed in this pull request?
- Output warnings for deprecated SQL configs in `SQLConf. setConfWithCheck()` and in `SQLConf. unsetConf()`
- Throw an exception for removed SQL configs in `SQLConf. setConfWithCheck()` when they set to non-default values
- Remove checking of deprecated and removed SQL configs from RuntimeConfig

### Why are the changes needed?
Currently, warnings/exceptions are printed only when a SQL config is set dynamically, for instance via `spark.conf.set()`. After the changes, removed/deprecated SQL configs will be checked when they set statically. For example:
```
$ bin/spark-shell --conf spark.sql.fromJsonForceNullableSchema=false
scala> spark.emptyDataFrame
java.lang.IllegalArgumentException: Error while instantiating 'org.apache.spark.sql.hive.HiveSessionStateBuilder':
...
Caused by: org.apache.spark.sql.AnalysisException: The SQL config 'spark.sql.fromJsonForceNullableSchema' was removed in the version 3.0.0. It was removed to prevent errors like SPARK-23173 for non-default value.
```
```
$ bin/spark-shell --conf spark.sql.hive.verifyPartitionPath=false
scala> spark.emptyDataFrame
20/02/20 02:10:26 WARN SQLConf: The SQL config 'spark.sql.hive.verifyPartitionPath' has been deprecated in Spark v3.0 and may be removed in the future. This config is replaced by 'spark.files.ignoreMissingFiles'.
```

### Does this PR introduce any user-facing change?
Yes

### How was this patch tested?
By `SQLConfSuite`

Closes #27645 from MaxGekk/remove-sql-configs-followup-2.

Authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-02-21 00:00:48 +08:00
Wenchen Fan 704d249a56 [SPARK-26071][FOLLOWUP] Improve migration guide of disallowing map type map key
### What changes were proposed in this pull request?

mention the workaround if users do want to use map type as key, and add a test to demonstrate it.

### Why are the changes needed?

it's better to provide an alternative when we ban something.

### Does this PR introduce any user-facing change?

no

### How was this patch tested?

N/A

Closes #27621 from cloud-fan/map.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-02-20 22:10:04 +08:00
herman c92d437c46 [SPARK-30811][SQL] CTE should not cause stack overflow when it refers to non-existent table with same name
### Why are the changes needed?
This ports the tests introduced in 7285eea683 to master to avoid future regressions.

### Background
A query with Common Table Expressions can cause a stack overflow when it contains a CTE that refers a non-existing table with the same name. The name of the table need to have a database qualifier. This is caused by a couple of things:

- CTESubstitution runs analysis on the CTE, but this does not throw an exception because the table has a database qualifier. The reason is that we don't fail is because we re-attempt to resolve the relation in a later rule;
- CTESubstitution replace logic does not check if the table it is replacing has a database, it shouldn't replace the relation if it does. So now we will happily replace nonexist.t with t;

Note that this not an issue for master or the spark-3.0 branch.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Added regression test to `AnalysisErrorSuite` and `DataFrameSuite`.

Closes #27635 from hvanhovell/SPARK-30811-master.

Authored-by: herman <herman@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2020-02-19 10:17:46 -08:00
LantaoJin c0715221b2 [SPARK-30785][SQL] Create table like should keep tracksPartitionsInCatalog same with source table
### What changes were proposed in this pull request?
Table generated by `CREATE TABLE LIKE` a partitioned table is a partitioned table. But when run `ALTER TABLE ADD PARTITION`, it will throw `AnalysisException: ALTER TABLE ADD PARTITION is not allowed`. That's because the default value of `tracksPartitionsInCatalog` from `CREATE TABLE LIKE` always is false.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Add a unit test.

Closes #27538 from LantaoJin/SPARK-30785.

Authored-by: LantaoJin <jinlantao@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-02-19 15:05:34 +08:00
beliefer 0894dbab2c [MINOR][SQL] Improve readability for window execution
### What changes were proposed in this pull request?
I read the comments of `WindowExec` and found some comment will cause confusion and another need to improve.

### Why are the changes needed?
This PR will enhance the readability and let developer works more easy

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
No need

Closes #27431 from beliefer/improve-window-readability.

Authored-by: beliefer <beliefer@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-02-19 14:26:27 +08:00
Wenchen Fan 1b67d546bd revert SPARK-29663 and SPARK-29688
### What changes were proposed in this pull request?

This PR reverts https://github.com/apache/spark/pull/26325 and https://github.com/apache/spark/pull/26347

### Why are the changes needed?

When we do sum/avg, we need a wider type of input to hold the sum value, to reduce the possibility of overflow. For example, we use long to hold the sum of integral inputs, use double to hold the sum of float/double.

However, we don't have a wider type of interval. Also the semantic is unclear: what if the days field overflows but the months field doesn't? Currently the avg of `1 month` and `2 month` is `1 month 15 days`, which assumes 1 month has 30 days and we should avoid this assumption.

### Does this PR introduce any user-facing change?

yes, remove 2 features added in 3.0

### How was this patch tested?

N/A

Closes #27619 from cloud-fan/revert.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: herman <herman@databricks.com>
2020-02-18 21:19:57 +01:00
yi.wu 643a480b11 [SPARK-30863][SQL] Distinguish Cast and AnsiCast in toString
### What changes were proposed in this pull request?

Prefix by `ansi_`  in `toString` if it's a `AnsiCast` or ansi enabled `Cast`.

E.g. run `spark.sql("select cast('51' as int)").queryExecution.analyzed` under ansi mode.

Before this PR:
```
Project [cast(51 as int) AS CAST(51 AS INT)#0]
+- OneRowRelation
```

After this PR:
```
Project [ansi_cast(51 as int) AS CAST(51 AS INT)#0]
+- OneRowRelation
```

### Why are the changes needed?

This is useful while comparing `LogicalPlan`s literally.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Pass Jenkins.

Closes #27608 from Ngone51/ansi_cast_tostring.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-02-18 16:10:43 +08:00
Terry Kim 5866bc77d7 [SPARK-30814][SQL] ALTER TABLE ... ADD COLUMN position should be able to reference columns being added
### What changes were proposed in this pull request?

In ALTER TABLE, a column in ADD COLUMNS can depend on the position of a column that is just being added. For example, for a table with the following schema:
```
root:
  - a: string
  - b: long
```
, the following should work:
```
ALTER TABLE t ADD COLUMNS (x int AFTER a, y int AFTER x)
```
Currently, the above statement will throw an exception saying that AFTER x cannot be resolved, because x doesn't exist yet. This PR proposes to fix this issue.

### Why are the changes needed?

To fix a bug described above.

### Does this PR introduce any user-facing change?

Yes, now
```
ALTER TABLE t ADD COLUMNS (x int AFTER a, y int AFTER x)
```
works as expected.

### How was this patch tested?

Added new tests

Closes #27584 from imback82/alter_table_pos_fix.

Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-02-18 13:01:45 +08:00
Liang Zhang d8c0599e54 [SPARK-30791][SQL][PYTHON] Add 'sameSemantics' and 'sementicHash' methods in Dataset
### What changes were proposed in this pull request?
This PR added two DeveloperApis to the Dataset[T] class. Both methods are just exposing lower-level methods to the Dataset[T] class.

### Why are the changes needed?
They are useful for checking whether two dataframes are the same when implementing dataframe caching in python, and also get a unique ID. It's easier to use if we wrap the lower-level APIs.

### Does this PR introduce any user-facing change?
```
scala> val df1 = Seq((1,2),(4,5)).toDF("col1", "col2")
df1: org.apache.spark.sql.DataFrame = [col1: int, col2: int]

scala> val df2 = Seq((1,2),(4,5)).toDF("col1", "col2")
df2: org.apache.spark.sql.DataFrame = [col1: int, col2: int]

scala> val df3 = Seq((0,2),(4,5)).toDF("col1", "col2")
df3: org.apache.spark.sql.DataFrame = [col1: int, col2: int]

scala> val df4 = Seq((0,2),(4,5)).toDF("col0", "col2")
df4: org.apache.spark.sql.DataFrame = [col0: int, col2: int]

scala> df1.semanticHash
res0: Int = 594427822

scala> df2.semanticHash
res1: Int = 594427822

scala> df1.sameSemantics(df2)
res2: Boolean = true

scala> df1.sameSemantics(df3)
res3: Boolean = false

scala> df3.semanticHash
res4: Int = -1592702048

scala> df4.semanticHash
res5: Int = -1592702048

scala> df4.sameSemantics(df3)
res6: Boolean = true
```

### How was this patch tested?
Unit test in scala and doctest in python.

Note: comments are copied from the corresponding lower-level APIs.
Note: There are some issues to be fixed that would improve the hash collision rate: https://github.com/apache/spark/pull/27565#discussion_r379881028

Closes #27565 from liangz1/df-same-result.

Authored-by: Liang Zhang <liang.zhang@databricks.com>
Signed-off-by: WeichenXu <weichen.xu@databricks.com>
2020-02-18 09:22:26 +08:00
Ajith 657d151395 [SPARK-29174][SQL] Support LOCAL in INSERT OVERWRITE DIRECTORY to data source
### What changes were proposed in this pull request?
`INSERT OVERWRITE LOCAL DIRECTORY` is supported with ensuring the provided path is always using `file://` as scheme and removing the check which throws exception if we do insert overwrite by mentioning directory with `LOCAL` syntax

### Why are the changes needed?
without the modification in PR, ``` insert overwrite local directory <location> using ```

throws exception

```
Error: org.apache.spark.sql.catalyst.parser.ParseException:

LOCAL is not supported in INSERT OVERWRITE DIRECTORY to data source(line 1, pos 0)
```
which was introduced in https://github.com/apache/spark/pull/18975, but this restriction is not needed, hence dropping the same.
Keep behaviour consistent for local and remote file-system in  `INSERT OVERWRITE DIRECTORY`

### Does this PR introduce any user-facing change?
Yes, after this change `INSERT OVERWRITE LOCAL DIRECTORY` will not throw exception

### How was this patch tested?
Added UT

Closes #27039 from ajithme/insertoverwrite2.

Authored-by: Ajith <ajith2489@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-02-18 09:42:31 +09:00
Ajith 2854091d12 [SPARK-22590][SQL] Copy sparkContext.localproperties to child thread in BroadcastExchangeExec.executionContext
### What changes were proposed in this pull request?
In `org.apache.spark.sql.execution.exchange.BroadcastExchangeExec#relationFuture` make a copy of `org.apache.spark.SparkContext#localProperties` and pass it to the broadcast execution thread in `org.apache.spark.sql.execution.exchange.BroadcastExchangeExec#executionContext`

### Why are the changes needed?
When executing `BroadcastExchangeExec`, the relationFuture is evaluated via a separate thread. The threads inherit the `localProperties` from `sparkContext` as they are the child threads.
These threads are created in the executionContext (thread pools). Each Thread pool has a default `keepAliveSeconds` of 60 seconds for idle threads.
Scenarios where the thread pool has threads which are idle and reused for a subsequent new query, the thread local properties will not be inherited from spark context (thread properties are inherited only on thread creation) hence end up having old or no properties set. This will cause taskset properties to be missing when properties are transferred by child thread via `sparkContext.runJob/submitJob`

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Added UT

Closes #27266 from ajithme/broadcastlocalprop.

Authored-by: Ajith <ajith2489@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-02-18 02:26:52 +08:00
Maxim Gekk afaeb29599 [SPARK-30808][SQL] Enable Java 8 time API in Thrift server
### What changes were proposed in this pull request?
- Set `spark.sql.datetime.java8API.enabled` to `true` in `hiveResultString()`, and restore it back at the end of the call.
- Convert collected `java.time.Instant` & `java.time.LocalDate` to `java.sql.Timestamp` and `java.sql.Date` for correct formatting.

### Why are the changes needed?
Because of textual representation of timestamps/dates before 1582 year is incorrect:
```shell
$ export TZ="America/Los_Angeles"
$ ./bin/spark-sql -S
```
```sql
spark-sql> set spark.sql.session.timeZone=America/Los_Angeles;
spark.sql.session.timeZone	America/Los_Angeles
spark-sql> SELECT DATE_TRUNC('MILLENNIUM', DATE '1970-03-20');
1001-01-01 00:07:02
```
It must be 1001-01-01 00:**00:00**.

### Does this PR introduce any user-facing change?
Yes. After the changes:
```shell
$ export TZ="America/Los_Angeles"
$ ./bin/spark-sql -S
```
```sql
spark-sql> set spark.sql.session.timeZone=America/Los_Angeles;
spark.sql.session.timeZone	America/Los_Angeles
spark-sql> SELECT DATE_TRUNC('MILLENNIUM', DATE '1970-03-20');
1001-01-01 00:00:00
```

### How was this patch tested?
By running hive-thiftserver tests. In particular:
```
./build/sbt -Phadoop-2.7 -Phive-2.3 -Phive-thriftserver "hive-thriftserver/test:testOnly *SparkThriftServerProtocolVersionsSuite"
```

Closes #27552 from MaxGekk/hive-thriftserver-java8-time-api.

Authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-02-18 02:15:44 +08:00
Yuanjian Li 5ffc5ff55e [SPARK-11150][SQL][FOLLOWUP] Move sql/dynamicpruning to sql/execution/dynamicpruning
### What changes were proposed in this pull request?
Follow-up work for #25600. In this PR, we move `sql/dynamicpruning` to `sql/execution/dynamicpruning`.

### Why are the changes needed?
Fix the unexpected public APIs in 3.0.0 #27560.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Existing UT.

Closes #27581 from xuanyuanking/SPARK-11150-follow.

Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-02-18 01:44:14 +08:00
wangguangxin.cn 0ae3ff60c4 [SPARK-30806][SQL] Evaluate once per group in UnboundedWindowFunctionFrame
### What changes were proposed in this pull request?
We only need to do aggregate evaluation once per group in `UnboundedWindowFunctionFrame`

### Why are the changes needed?
Currently, in `UnboundedWindowFunctionFrame.write`,it re-evaluate the processor for each row in a group, which is not necessary in fact which I'll address later. It hurts performance when the evaluation is time-consuming (for example, Percentile's eval need to sort its buffer and do some calculation). In our production, there is a percentile with window operation sql,  it costs more than 10 hours in SparkSQL while 10min in Hive.

In fact, `UnboundedWindowFunctionFrame` can be treated as `SlidingWindowFunctionFrame` with `lbound = UnboundedPreceding` and `ubound = UnboundedFollowing`, just as its comments. In that case, `SlidingWindowFunctionFrame` also only do evaluation once for each group.

The performance issue can be reproduced by running the follow scripts in local spark-shell
```
spark.range(100*100).map(i => (i, "India")).toDF("uv", "country").createOrReplaceTempView("test")
sql("select uv, country, percentile(uv, 0.95) over (partition by country) as ptc95 from test").collect.foreach(println)
```
Before this patch, the sql costs **128048 ms**.
With this patch,  the sql costs **3485 ms**.

If we increase the data size to 1000*1000 for example, then spark cannot even produce result without this patch(I'v waited for several hours).

### Does this PR introduce any user-facing change?
NO

### How was this patch tested?
Existing UT

Closes #27558 from WangGuangxin/windows.

Authored-by: wangguangxin.cn <wangguangxin.cn@gmail.com>
Signed-off-by: herman <herman@databricks.com>
2020-02-17 18:15:54 +01:00
Yuanjian Li e4a541b278 [SPARK-30829][SQL] Define LegacyBehaviorPolicy enumeration as the common value for result change configs
### What changes were proposed in this pull request?
Define a new enumeration `LegacyBehaviorPolicy` in SQLConf, it will be used as the common value for result change configs.

### Why are the changes needed?
During API auditing for the 3.0 release, we found several new approaches that will change the results silently. For these features, we need a common three-value config.

### Does this PR introduce any user-facing change?
Yes, original config `spark.sql.legacy.ctePrecedence.enabled` change to `spark.sql.legacy.ctePrecedencePolicy`.

### How was this patch tested?
Existing UT.

Closes #27579 from xuanyuanking/SPARK-30829.

Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-02-18 00:52:05 +08:00
Arwin Tio 25e9156bc0 [SPARK-29089][SQL] Parallelize blocking FileSystem calls in DataSource#checkAndGlobPathIfNecessary
### What changes were proposed in this pull request?
See JIRA: https://issues.apache.org/jira/browse/SPARK-29089
Mailing List: http://apache-spark-developers-list.1001551.n3.nabble.com/DataFrameReader-bottleneck-in-DataSource-checkAndGlobPathIfNecessary-when-reading-S3-files-td27828.html

When using DataFrameReader#csv to read many files on S3, globbing and fs.exists on DataSource#checkAndGlobPathIfNecessary becomes a bottleneck.

From the mailing list discussions, an improvement that can be made is to parallelize the blocking FS calls:

> - have SparkHadoopUtils differentiate between files returned by globStatus(), and which therefore exist, and those which it didn't glob for -it will only need to check those.
> - add parallel execution to the glob and existence checks

### Why are the changes needed?

Verifying/globbing files happens on the driver, and if this operations take a long time (for example against S3), then the entire cluster has to wait, potentially sitting idle. This change hopes to make this process faster.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

I added a test suite `DataSourceSuite` - open to suggestions for better naming.

See [here](https://github.com/apache/spark/pull/25899#issuecomment-534380034) and [here](https://github.com/apache/spark/pull/25899#issuecomment-534069194) for some measurements

Closes #25899 from cozos/master.

Lead-authored-by: Arwin Tio <Arwin.tio@adroll.com>
Co-authored-by: Arwin Tio <arwin.tio@hotmail.com>
Co-authored-by: Arwin Tio <arwin.tio@adroll.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
2020-02-17 09:30:35 -06:00
Maxim Gekk 06217cfded [SPARK-30793][SQL] Fix truncations of timestamps before the epoch to minutes and seconds
### What changes were proposed in this pull request?
In the PR, I propose to replace `%` by `Math.floorMod` in `DateTimeUtils.truncTimestamp` for the `SECOND` and `MINUTE` levels.

### Why are the changes needed?
This fixes the issue of incorrect truncation of timestamps before the epoch `1970-01-01T00:00:00.000000Z` to the `SECOND` and `MINUTE` levels. For example, timestamps after the epoch are truncated by cutting off the rest part of the timestamp:
```sql
spark-sql> select date_trunc('SECOND', '2020-02-11 00:01:02.123');
2020-02-11 00:01:02
```
but seconds in the truncated timestamp before the epoch are increased by 1:
```sql
spark-sql> select date_trunc('SECOND', '1960-02-11 00:01:02.123');
1960-02-11 00:01:03
```

### Does this PR introduce any user-facing change?
Yes. After the changes, the example above outputs correct result:
```sql
spark-sql> select date_trunc('SECOND', '1960-02-11 00:01:02.123');
1960-02-11 00:01:02
```

### How was this patch tested?
Added new tests to `DateFunctionsSuite`.

Closes #27543 from MaxGekk/fix-second-minute-truc.

Authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-02-17 22:51:56 +08:00
Yuanjian Li ab186e3659 [SPARK-25829][SQL] Add config spark.sql.legacy.allowDuplicatedMapKeys and change the default behavior
### What changes were proposed in this pull request?
This is a follow-up for #23124, add a new config `spark.sql.legacy.allowDuplicatedMapKeys` to control the behavior of removing duplicated map keys in build-in functions. With the default value `false`, Spark will throw a RuntimeException while duplicated keys are found.

### Why are the changes needed?
Prevent silent behavior changes.

### Does this PR introduce any user-facing change?
Yes, new config added and the default behavior for duplicated map keys changed to RuntimeException thrown.

### How was this patch tested?
Modify existing UT.

Closes #27478 from xuanyuanking/SPARK-25892-follow.

Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-02-17 22:06:58 +08:00
Maxim Gekk 9107f77f15 [SPARK-30843][SQL] Fix getting of time components before 1582 year
### What changes were proposed in this pull request?

1. Rewrite DateTimeUtils methods `getHours()`, `getMinutes()`, `getSeconds()`, `getSecondsWithFraction()`, `getMilliseconds()` and `getMicroseconds()` using Java 8 time APIs. This will automatically switch the `Hour`, `Minute`, `Second` and `DatePart` expressions on Proleptic Gregorian calendar.
2. Remove unused methods and constant of DateTimeUtils - `to2001`, `YearZero `, `toYearZero` and `absoluteMicroSecond()`.
3. Remove unused value `timeZone` from `TimeZoneAwareExpression` since all expressions have been migrated to Java 8 time API, and legacy instance of `TimeZone` is not needed any more.
4. Change signatures of modified DateTimeUtils methods, and pass `ZoneId` instead of `TimeZone`. This will allow to avoid unnecessary conversions `TimeZone` -> `String` -> `ZoneId`.
5. Modify tests in `DateTimeUtilsSuite` and in `DateExpressionsSuite` to pass `ZoneId` instead of `TimeZone`. Correct the tests, to pass tested zone id instead of None.

### Why are the changes needed?
The changes fix the issue of wrong results returned by the `hour()`, `minute()`, `second()`, `date_part('millisecond', ...)` and `date_part('microsecond', ....)`, see example in [SPARK-30843](https://issues.apache.org/jira/browse/SPARK-30843).

### Does this PR introduce any user-facing change?
Yes. After the changes, the results of examples from SPARK-30843:
```sql
spark-sql> select hour(timestamp '0010-01-01 00:00:00');
0
spark-sql> select minute(timestamp '0010-01-01 00:00:00');
0
spark-sql> select second(timestamp '0010-01-01 00:00:00');
0
spark-sql> select date_part('milliseconds', timestamp '0010-01-01 00:00:00');
0.000
spark-sql> select date_part('microseconds', timestamp '0010-01-01 00:00:00');
0
```

### How was this patch tested?
- By existing test suites `DateTimeUtilsSuite`, `DateExpressionsSuite` and `DateFunctionsSuite`.
- Add new tests to `DateExpressionsSuite` and `DateTimeUtilsSuite` for 10 year, like:
```scala
  input = date(10, 1, 1, 0, 0, 0, 0, zonePST)
  assert(getHours(input, zonePST) === 0)
```
- Re-run `DateTimeBenchmark` using Amazon EC2.

| Item | Description |
| ---- | ----|
| Region | us-west-2 (Oregon) |
| Instance | r3.xlarge |
| AMI | ami-06f2f779464715dc5 (ubuntu/images/hvm-ssd/ubuntu-bionic-18.04-amd64-server-20190722.1) |
| Java | OpenJDK8/11 |

Closes #27596 from MaxGekk/localtimestamp-greg-cal.

Lead-authored-by: Maxim Gekk <max.gekk@gmail.com>
Co-authored-by: Max Gekk <max.gekk@gmail.com>
Co-authored-by: Ubuntu <ubuntu@ip-172-31-1-30.us-west-2.compute.internal>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-02-17 13:59:21 +08:00
Wenchen Fan ab07c6300c [SPARK-30799][SQL] "spark_catalog.t" should not be resolved to temp view
### What changes were proposed in this pull request?

No v2 command supports temp views and the `ResolveCatalogs`/`ResolveSessionCatalog` framework is designed with this assumption.

However, `ResolveSessionCatalog` needs to fallback to v1 commands, which do support temp views (e.g. CACHE TABLE). To work around it, we add a hack in `CatalogAndIdentifier`, which does not expand the given identifier with current namespace if the catalog is session catalog.

This works fine in most cases, as temp views should take precedence over tables during lookup. So if `CatalogAndIdentifier` returns a single name "t", the v1 commands can still resolve it to temp views correctly, or resolve it to table "default.t" if temp view doesn't exist.

However, if users write `spark_catalog.t`, it shouldn't be resolved to temp views as temp views don't belong to any catalog. `CatalogAndIdentifier` can't distinguish between `spark_catalog.t` and `t`, so the caller side may mistakenly resolve `spark_catalog.t` to a temp view.

This PR proposes to fix this issue by
1. remove the hack in `CatalogAndIdentifier`, and clearly document that this shouldn't be used to resolve temp views.
2. update `ResolveSessionCatalog` to explicitly look up temp views first before calling `CatalogAndIdentifier`, for v1 commands that support temp views.

### Why are the changes needed?

To avoid releasing a behavior that we should not support.

Removing the hack also fixes the problem we hit in https://github.com/apache/spark/pull/27532/files#diff-57b3d87be744b7d79a9beacf8e5e5eb2R937

### Does this PR introduce any user-facing change?

yes, now it's not allowed to refer to a temp view with `spark_catalog` prefix.

### How was this patch tested?

new tests

Closes #27550 from cloud-fan/ns.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-02-17 12:07:46 +08:00
Maxim Gekk 8b73b92aad [SPARK-30826][SQL] Respect reference case in StringStartsWith pushed down to parquet
### What changes were proposed in this pull request?
In the PR, I propose to convert the attribute name of `StringStartsWith` pushed down to the Parquet datasource to column reference via the `nameToParquetField` map. Similar conversions are performed for other source filters pushed down to parquet.

### Why are the changes needed?
This fixes the bug described in [SPARK-30826](https://issues.apache.org/jira/browse/SPARK-30826). The query from an external table:
```sql
CREATE TABLE t1 (col STRING)
USING parquet
OPTIONS (path '$path')
```
created on top of written parquet files by `Seq("42").toDF("COL").write.parquet(path)` returns wrong empty result:
```scala
spark.sql("SELECT * FROM t1 WHERE col LIKE '4%'").show
+---+
|col|
+---+
+---+
```

### Does this PR introduce any user-facing change?
Yes. After the changes the result is correct for the example above:
```scala
spark.sql("SELECT * FROM t1 WHERE col LIKE '4%'").show
+---+
|col|
+---+
| 42|
+---+
```

### How was this patch tested?
Added a test to `ParquetFilterSuite`

Closes #27574 from MaxGekk/parquet-StringStartsWith-case-sens.

Authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-02-15 19:49:58 +08:00
DB Tsai d0f9614760 [SPARK-30289][SQL] Partitioned by Nested Column for InMemoryTable
### What changes were proposed in this pull request?
1. `InMemoryTable` was flatting the nested columns, and then the flatten columns was used to look up the indices which is not correct.

This PR implements partitioned by nested column for `InMemoryTable`.

### Why are the changes needed?

This PR implements partitioned by nested column for `InMemoryTable`, so we can test this features in DSv2

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Existing unit tests and new tests.

Closes #26929 from dbtsai/addTests.

Authored-by: DB Tsai <d_tsai@apple.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
2020-02-14 21:46:01 +00:00
Maxim Gekk 7137a6d065 [SPARK-30766][SQL] Fix the timestamp truncation to the HOUR and DAY levels
### What changes were proposed in this pull request?
In the PR, I propose to use Java 8 time API in timestamp truncations to the levels of `HOUR` and `DAY`. The problem is in the usage of `timeZone.getOffset(millis)` in days/hours truncations where the combined calendar (Julian + Gregorian) is used underneath.

### Why are the changes needed?
The change fix wrong truncations. For example, the following truncation to hours should print `0010-01-01 01:00:00` but it outputs wrong timestamp:
```scala
Seq("0010-01-01 01:02:03.123456").toDF()
    .select($"value".cast("timestamp").as("ts"))
    .select(date_trunc("HOUR", $"ts").cast("string"))
    .show(false)
+------------------------------------+
|CAST(date_trunc(HOUR, ts) AS STRING)|
+------------------------------------+
|0010-01-01 01:30:17                 |
+------------------------------------+
```

### Does this PR introduce any user-facing change?
Yes. After the changes, the result of the example above is:
```scala
+------------------------------------+
|CAST(date_trunc(HOUR, ts) AS STRING)|
+------------------------------------+
|0010-01-01 01:00:00                 |
+------------------------------------+
```

### How was this patch tested?
- Added new test to `DateFunctionsSuite`
- By `DateExpressionsSuite` and `DateTimeUtilsSuite`

Closes #27512 from MaxGekk/fix-trunc-old-timestamp.

Authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-02-14 22:16:57 +08:00
HyukjinKwon 2a270a731a [SPARK-30810][SQL] Parses and convert a CSV Dataset having different column from 'value' in csv(dataset) API
### What changes were proposed in this pull request?

This PR fixes `DataFrameReader.csv(dataset: Dataset[String])` API to take a `Dataset[String]` originated from a column name different from `value`. This is a long-standing bug started from the very first place.

`CSVUtils.filterCommentAndEmpty` assumed the `Dataset[String]` to be originated with `value` column. This PR changes to use the first column name in the schema.

### Why are the changes needed?

For  `DataFrameReader.csv(dataset: Dataset[String])` to support any `Dataset[String]` as the signature indicates.

### Does this PR introduce any user-facing change?
Yes,

```scala
val ds = spark.range(2).selectExpr("concat('a,b,', id) AS text").as[String]
spark.read.option("header", true).option("inferSchema", true).csv(ds).show()
```

Before:

```
org.apache.spark.sql.AnalysisException: cannot resolve '`value`' given input columns: [text];;
'Filter (length(trim('value, None)) > 0)
+- Project [concat(a,b,, cast(id#0L as string)) AS text#2]
   +- Range (0, 2, step=1, splits=Some(2))
```

After:

```
+---+---+---+
|  a|  b|  0|
+---+---+---+
|  a|  b|  1|
+---+---+---+
```

### How was this patch tested?

Unittest was added.

Closes #27561 from HyukjinKwon/SPARK-30810.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-02-14 18:20:18 +08:00