Commit graph

29676 commits

Author SHA1 Message Date
angerszhu a98dc60408 [SPARK-33308][SQL] Refactor current grouping analytics
### What changes were proposed in this pull request?
As discussed in
https://github.com/apache/spark/pull/30145#discussion_r514728642
https://github.com/apache/spark/pull/30145#discussion_r514734648

We need to rewrite current Grouping Analytics grammar to support  as flexible as Postgres SQL to support subsequent development.
In  postgres sql, it support
```
select a, b, c, count(1) from t group by cube (a, b, c);
select a, b, c, count(1) from t group by cube(a, b, c);
select a, b, c, count(1) from t group by cube (a, b, c, (a, b), (a, b, c));
select a, b, c, count(1) from t group by rollup(a, b, c);
select a, b, c, count(1) from t group by rollup (a, b, c);
select a, b, c, count(1) from t group by rollup (a, b, c, (a, b), (a, b, c));
```
In this pr,  we have done three things as below, and we will split it to different pr:

 - Refactor CUBE/ROLLUP (regarding them as ANTLR tokens in a parser)
 - Refactor GROUPING SETS (the logical node -> a new expr)
 - Support new syntax for CUBE/ROLLUP (e.g., GROUP BY CUBE ((a, b), (a, c)))

### Why are the changes needed?
Rewrite current Grouping Analytics grammar to support  as flexible as Postgres SQL to support subsequent development.

### Does this PR introduce _any_ user-facing change?
User can  write Grouping Analytics grammar as flexible as Postgres SQL to support subsequent development.

### How was this patch tested?
Added UT

Closes #30212 from AngersZhuuuu/refact-grouping-analytics.

Lead-authored-by: angerszhu <angers.zhu@gmail.com>
Co-authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Co-authored-by: AngersZhuuuu <angers.zhu@gmail.com>
Co-authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-03-30 12:31:58 +00:00
Cheng Su 935aa8c8db [SPARK-32985][SQL][FOLLOWUP] Rename createNonBucketedReadRDD and minor change in FileSourceScanExec
### What changes were proposed in this pull request?

This PR is a followup change to address comments in https://github.com/apache/spark/pull/31413#discussion_r603280965 and https://github.com/apache/spark/pull/31413#discussion_r603296475 . Minor change in `FileSourceScanExec`. No actual logic change here.

### Why are the changes needed?

Better readability.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing unit tests.

Closes #32000 from c21/bucket-scan.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2021-03-30 19:57:32 +09:00
David Li 1237124062 [SPARK-34463][PYSPARK][DOCS] Document caveats of Arrow selfDestruct
### What changes were proposed in this pull request?

As a followup for #29818, document caveats of using the Arrow selfDestruct option in toPandas, which include:
- toPandas() may be slower;
- the resulting dataframe may not support some Pandas operations due to immutable backing arrays.

### Why are the changes needed?

This will hopefully reduce user confusion as with SPARK-34463.

### Does this PR introduce _any_ user-facing change?

Yes - documentation is updated and a config setting description is updated to clearly indicate the config is experimental.

### How was this patch tested?
This is a documentation-only change.

Closes #31738 from lidavidm/spark-34463.

Authored-by: David Li <li.davidm96@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2021-03-30 13:30:27 +09:00
yangjie01 7158e7f986 [SPARK-34900][TEST] Make sure benchmarks can run using spark-submit cmd described in the guide
### What changes were proposed in this pull request?
Some `spark-submit`  commands used to run benchmarks in the user's guide is wrong, we can't use these commands to run benchmarks successful.

So the major changes of this pr is correct these wrong commands, for example, run a benchmark which inherits from `SqlBasedBenchmark`, we must specify `--jars <spark core test jar>,<spark catalyst test jar>` because `SqlBasedBenchmark` based benchmark extends `BenchmarkBase(defined in spark core test jar)` and `SQLHelper(defined in spark catalyst test jar)`.

Another change of this pr is removed the `scalatest Assertions` dependency of Benchmarks because `scalatest-*.jar` are not in the distribution package, it will be troublesome to use.

### Why are the changes needed?
Make sure benchmarks can run using spark-submit cmd described in the guide

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Use the corrected `spark-submit` commands to run benchmarks successfully.

Closes #31995 from LuciferYang/fix-benchmark-guide.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2021-03-30 11:58:01 +09:00
Kent Yao 5692aa0c2c [SPARK-34894][CORE] Use 'io.connectionTimeout' as a hint instead of 'spark.network.timeout' for lost connections
### What changes were proposed in this pull request?

Currently, when a connection for TransportClient is marked as idled and closed, we suggest users adjust `spark.network.timeout` for all transport modules. As a lot of timeout configs will fallback to the `spark.network.timeout`, this could be a piece of overkill advice, we should give a more targeted one with `spark.${moduleName}.io.connectionTimeout`

### Why are the changes needed?

better advise for overloaded network traffic cases

### Does this PR introduce _any_ user-facing change?

yes, when a connection is zombied and closed by spark internally, users can use a more targeted config to tune their jobs
### How was this patch tested?

Just log and doc. Passing Jenkins and GA

Closes #31990 from yaooqinn/SPARK-34894.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Kent Yao <yao@apache.org>
2021-03-30 09:58:24 +08:00
Yuming Wang fcef2375a3 [SPARK-34622][SQL] Push down limit through Project with Join
### What changes were proposed in this pull request?

There is a `Project` between `LocalLimit` and `Join` if `Join`'s output do not match the `LocalLimit`'s output. This pr add support push down limit through this case. For example:
   ```scala
   spark.sql("create table t1(a int, b int, c int) using parquet")
   spark.sql("create table t2(x int, y int, z int) using parquet")
   spark.sql("select a from t1 left join t2 on a = x and b = y limit 5").explain("extended")
   ```

   ```
   == Optimized Logical Plan ==
   GlobalLimit 5
   +- LocalLimit 5
      +- Project [a#0]
         +- Join LeftOuter, ((a#0 = x#3) AND (b#1 = y#4))
            :- Project [a#0, b#1]
            :  +- Relation default.t1[a#0,b#1,c#2] parquet
            +- Project [x#3, y#4]
               +- Filter (isnotnull(x#3) AND isnotnull(y#4))
                  +- Relation default.t2[x#3,y#4,z#5] parquet
   ```

   After this pr:
   ```
   == Optimized Logical Plan ==
   GlobalLimit 5
   +- LocalLimit 5
      +- Project [a#0]
         +- Join LeftOuter, ((a#0 = x#3) AND (b#1 = y#4))
            :- LocalLimit 5
            :  +- Project [a#0, b#1]
            :     +- Relation default.t1[a#0,b#1,c#2] parquet
            +- Project [x#3, y#4]
               +- Filter (isnotnull(x#3) AND isnotnull(y#4))
                  +- Relation default.t2[x#3,y#4,z#5] parquet
   ```

### Why are the changes needed?

Improve limit push down to improve query performance.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

Closes #31739 from wangyum/SPARK-34622.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2021-03-30 10:45:30 +09:00
Baohe Zhang b2bfe985e8 [SPARK-34845][CORE] ProcfsMetricsGetter shouldn't return partial procfs metrics
### What changes were proposed in this pull request?
In ProcfsMetricsGetter.scala, propogating IOException from addProcfsMetricsFromOneProcess to computeAllMetrics when the child pid's proc stat file is unavailable. As a result, the for-loop in computeAllMetrics() can terminate earlier and return an all-0 procfs metric.

### Why are the changes needed?
In the case of a child pid's stat file missing and the subsequent child pids' stat files exist, ProcfsMetricsGetter.computeAllMetrics() will return partial metrics (the sum of a subset of child pids), which can be misleading and is undesired per the existing code comments in https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala#L214.

Also, a side effect of this bug is that it can lead to a verbose warning log if many pids' stat files are missing. An early terminating can make the warning logs more concise.

The unit test can also explain the bug well.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
A unit test is added.

Closes #31945 from baohe-zhang/SPARK-34845.

Authored-by: Baohe Zhang <baohe.zhang@verizonmedia.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-03-29 07:46:58 -07:00
Jungtaek Lim 43e08b1f0f [SPARK-34255][SQL] Support partitioning with static number on required distribution and ordering on V2 write
### What changes were proposed in this pull request?

This PR proposes to extend the functionality of requirement for distribution and ordering on V2 write to specify the number of partitioning on repartition, so that data source is able to control the parallelism and determine the data distribution per partition in prior.

The partitioning with static number is optional, and by default disabled via default method, so only implementations required to restrict the number of partition statically need to override the method and provide the number.

Note that we don't support static number of partitions with unspecified distribution for this PR, as we haven't found the real use cases, and for hypothetical case the static number isn't good enough. Javadoc clearly describes the limitation.

### Why are the changes needed?

The use case comes from feature parity with DSv1.

I have state data source which enables the state in SS to be rewritten, which enables repartitioning, schema evolution, etc via batch query. The writer requires hash partitioning against group key, with the "desired number of partitions", which is same as what Spark does read and write against state.

This is now implemented as DSv1, and the requirement is simply done by calling repartition with the "desired number".

```
val fullPathsForKeyColumns = keySchema.map(key => new Column(s"key.${key.name}"))
data
  .repartition(newPartitions, fullPathsForKeyColumns: _*)
  .queryExecution
  .toRdd
  .foreachPartition(
    writeFn(resolvedCpLocation, version, operatorId, storeName, keySchema, valueSchema,
      storeConf, hadoopConfBroadcast, queryId))
```

Thanks to SPARK-34026, it's now possible to require the hash partitioning, but still not able to require the number of partitions. This PR will enable to let data source require the number of partitions.

### Does this PR introduce _any_ user-facing change?

Yes, but only for data source implementors. Even for them, this is no breaking change as default method is added.

### How was this patch tested?

Added UTs.

Closes #31355 from HeartSaVioR/SPARK-34255.

Lead-authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Co-authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-03-29 14:33:23 +00:00
Kousuke Saruta 14c7bb877d [SPARK-34872][SQL] quoteIfNeeded should quote a name which contains non-word characters
### What changes were proposed in this pull request?

This PR fixes an issue that `quoteIfNeeded` quotes a name only if it contains `.` or ``` ` ```.
This method should quote it if it contains non-word characters.

### Why are the changes needed?

It's a potential bug.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New test.

Closes #31964 from sarutak/fix-quoteIfNeeded.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-03-29 09:31:24 +00:00
Angerszhuuuu 015c59843c [SPARK-34879][SQL] HiveInspector supports DayTimeIntervalType and YearMonthIntervalType
### What changes were proposed in this pull request?
Make HiveInspector support DayTimeIntervalType and YearMonthIntervalType.
Then we can use these two types in HiveUDF and HiveScriptTransformation

### Why are the changes needed?
Support more data type when use hive serde

### Does this PR introduce _any_ user-facing change?
User can use  `DayTimeIntervalType` and `YearMonthIntervalType` in HiveUDF and  HiveScriptTransformation

### How was this patch tested?
Added UT

Closes #31979 from AngersZhuuuu/SPARK-34879.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-03-29 08:38:20 +03:00
Angerszhuuuu 2356cdd420 [SPARK-34814][SQL] LikeSimplification should handle NULL
### What changes were proposed in this pull request?
LikeSimplification should handle NULL.

UT will failed  before this pr
```
  test("SPARK-34814: LikeSimplification should handle NULL") {
    withSQLConf(SQLConf.OPTIMIZER_EXCLUDED_RULES.key ->
      ConstantFolding.getClass.getName.stripSuffix("$")) {
      checkEvaluation(Literal.create("foo", StringType)
        .likeAll("%foo%", Literal.create(null, StringType)), null)
    }
  }

[info] - test *** FAILED *** (2 seconds, 443 milliseconds)
[info]   java.lang.NullPointerException:
[info]   at org.apache.spark.sql.catalyst.optimizer.LikeSimplification$.$anonfun$simplifyMultiLike$1(expressions.scala:697)
[info]   at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
[info]   at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
[info]   at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
[info]   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
[info]   at scala.collection.TraversableLike.map(TraversableLike.scala:238)
[info]   at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
[info]   at scala.collection.AbstractTraversable.map(Traversable.scala:108)
[info]   at org.apache.spark.sql.catalyst.optimizer.LikeSimplification$.org$apache$spark$sql$catalyst$optimizer$LikeSimplification$$simplifyMultiLike(expressions.scala:697)
[info]   at org.apache.spark.sql.catalyst.optimizer.LikeSimplification$$anonfun$apply$9.applyOrElse(expressions.scala:722)
[info]   at org.apache.spark.sql.catalyst.optimizer.LikeSimplification$$anonfun$apply$9.applyOrElse(expressions.scala:714)
[info]   at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:316)
[info]   at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
[info]   at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:316)
[info]   at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:321)
[info]   at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:406)
[info]   at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:242)
[info]   at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:404)
[info]   at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:357)
[info]   at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:321)
[info]   at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsDown$1(QueryPlan.scala:94)
[info]   at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:116)
[info]   at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
```

### Why are the changes needed?
Fix bug

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added UT

Closes #31976 from AngersZhuuuu/SPARK-34814.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2021-03-29 12:05:00 +09:00
Tanel Kiis 4b9e94c444 [SPARK-34876][SQL] Fill defaultResult of non-nullable aggregates
### What changes were proposed in this pull request?

Filled the `defaultResult` field on non-nullable aggregates

### Why are the changes needed?

The `defaultResult` defaults to `None` and in some situations (like correlated scalar subqueries) it is used for the value of the aggregation.

The UT result before the fix:
```
-- !query
SELECT t1a,
   (SELECT count(t2d) FROM t2 WHERE t2a = t1a) count_t2,
   (SELECT count_if(t2d > 0) FROM t2 WHERE t2a = t1a) count_if_t2,
   (SELECT approx_count_distinct(t2d) FROM t2 WHERE t2a = t1a) approx_count_distinct_t2,
   (SELECT collect_list(t2d) FROM t2 WHERE t2a = t1a) collect_list_t2,
   (SELECT collect_set(t2d) FROM t2 WHERE t2a = t1a) collect_set_t2,
    (SELECT hex(count_min_sketch(t2d, 0.5d, 0.5d, 1)) FROM t2 WHERE t2a = t1a) collect_set_t2
FROM t1
-- !query schema
struct<t1a:string,count_t2:bigint,count_if_t2:bigint,approx_count_distinct_t2:bigint,collect_list_t2:array<bigint>,collect_set_t2:array<bigint>,collect_set_t2:string>
-- !query output
val1a	0	0	NULL	NULL	NULL	NULL
val1a	0	0	NULL	NULL	NULL	NULL
val1a	0	0	NULL	NULL	NULL	NULL
val1a	0	0	NULL	NULL	NULL	NULL
val1b	6	6	3	[19,119,319,19,19,19]	[19,119,319]	0000000100000000000000060000000100000004000000005D8D6AB90000000000000000000000000000000400000000000000010000000000000001
val1c	2	2	2	[219,19]	[219,19]	0000000100000000000000020000000100000004000000005D8D6AB90000000000000000000000000000000100000000000000000000000000000001
val1d	0	0	NULL	NULL	NULL	NULL
val1d	0	0	NULL	NULL	NULL	NULL
val1d	0	0	NULL	NULL	NULL	NULL
val1e	1	1	1	[19]	[19]	0000000100000000000000010000000100000004000000005D8D6AB90000000000000000000000000000000100000000000000000000000000000000
val1e	1	1	1	[19]	[19]	0000000100000000000000010000000100000004000000005D8D6AB90000000000000000000000000000000100000000000000000000000000000000
val1e	1	1	1	[19]	[19]	0000000100000000000000010000000100000004000000005D8D6AB90000000000000000000000000000000100000000000000000000000000000000
```

### Does this PR introduce _any_ user-facing change?

Bugfix

### How was this patch tested?

UT

Closes #31973 from tanelk/SPARK-34876_non_nullable_agg_subquery.

Authored-by: Tanel Kiis <tanel.kiis@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2021-03-29 11:47:08 +09:00
hanover-fiste 4fceef0159 [SPARK-34843][SQL] Calculate more precise partition stride in JDBCRelation
### What changes were proposed in this pull request?
The changes being proposed are to increase the accuracy of JDBCRelation's stride calculation, as outlined in: https://issues.apache.org/jira/browse/SPARK-34843

In summary:

Currently, in JDBCRelation (line 123), the stride size is calculated as follows:
val stride: Long = upperBound / numPartitions - lowerBound / numPartitions

Due to truncation happening on both divisions, the stride size can fall short of what it should be. This can lead to a big difference between the provided upper bound and the actual start of the last partition.

I'm proposing a different formula that doesn't truncate to early, and also maintains accuracy using fixed-point decimals. This helps tremendously with the size of the last partition, which can be even more amplified if there is data skew in that direction. In a real-life test, I've seen a 27% increase in performance with this more proper stride alignment. The reason for fixed-point decimals instead of floating-point decimals is because inaccuracy due to limitation of what the float can represent. This may seem small, but could shift the midpoint a bit, and depending on how granular the data is, that could translate to quite a difference. It's also just inaccurate, and I'm striving to make the partitioning as accurate as possible, within reason.

Lastly, since the last partition's predicate is determined by how the strides align starting from the lower bound (plus one stride), there can be skew introduced creating a larger last partition compared to the first partition. Therefore, after calculating a more precise stride size, I've also introduced logic to move the first partition's predicate (which is an offset from the lower bound) to a position that closely matches the offset of the last partition's predicate (in relation to the upper bound). This makes the first and last partition more evenly distributed compared to each other, and helps with the last task being the largest (reducing its size).

### Why are the changes needed?
The current implementation is inaccurate and can lead to the last task/partition running much longer than previous tasks. Therefore, you can end up with a single node/core running for an extended period while other nodes/cores are sitting idle.

### Does this PR introduce _any_ user-facing change?
No. I would suspect some users will just get a good performance increase. As stated above, if we were to run our code on Spark that has this change implemented, we would have all of the sudden got a 27% increase in performance.

### How was this patch tested?
I've added two new unit tests. I did need to update one unit test, but when you look at the comparison of the before and after, you'll see better alignment of the partitioning with the new implementation. Given that the lower partition's predicate is exclusive and the upper's is inclusive, the offset of the lower was 3 days, and the offset of the upper was 6 days... that's potentially twice the amount of data in that upper partition (could be much more depending on how the user's data is distributed).

Other unit tests that utilize timestamps and two partitions have maintained their midpoint.

### Examples

I've added results with and without the realignment logic to better highlight both improvements this PR brings.

**Example 1:**
Given the following partition config:
"lowerBound" -> "1930-01-01"
"upperBound" -> "2020-12-31"
"numPartitions" -> 1000

_Old method (exactly what it would be BEFORE this PR):_
First partition: "PartitionColumn" < '1930-02-02' or "PartitionColumn" is null
Last partition: "PartitionColumn" >= '2017-07-11'
_Old method, but with new realingment logic of first partition:_
First partition: "PartitionColumn" < '1931-10-14' or "PartitionColumn" is null
Last partition: "PartitionColumn" >= '2019-03-22'

_New method:_
First partition: "PartitionColumn" < '1930-02-03' or "PartitionColumn" is null
Last partition: "PartitionColumn" >= '2020-04-05'
_New with new realingment logic of first partition (exactly what it would be AFTER this PR):_
First partition: "PartitionColumn" < '1930-06-02' or "PartitionColumn" is null
Last partition: "PartitionColumn" >= '2020-08-02'

**Example 2:**
Given the following partition config:
"lowerBound" -> "1927-04-05",
"upperBound" -> "2020-10-16"
"numPartitions" -> 2000

_Old method (exactly what it would be BEFORE this PR):_
First partition: "PartitionColumn" < '1927-04-21' or "PartitionColumn" is null
Last partition: "PartitionColumn" >= '2014-10-29'
_Old method, but with new realingment logic of first partition::_
First partition: "PartitionColumn" < '1930-04-07' or "PartitionColumn" is null
Last partition: "PartitionColumn" >= '2017-10-15'

_New method:_
First partition: "PartitionColumn" < '1927-04-22' or "PartitionColumn" is null
Last partition: "PartitionColumn" >= '2020-04-19'
_New method with new realingment logic of first partition (exactly what it would be AFTER this PR):_
First partition: "PartitionColumn" < '1927-07-13' or "PartitionColumn" is null
Last partition: "PartitionColumn" >= '2020-07-10'

Closes #31965 from hanover-fiste/SPARK-34843.

Authored-by: hanover-fiste <jyarbrough.git@gmail.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
2021-03-28 12:59:20 -05:00
Peter Toth 3382190349 [SPARK-34829][SQL] Fix higher order function results
### What changes were proposed in this pull request?
This PR fixes a correctness issue with higher order functions. The results of function expressions needs to be copied in some higher order functions as such an expression can return with internal buffers and higher order functions can call multiple times the expression.
The issue was discovered with typed `ScalaUDF`s after https://github.com/apache/spark/pull/28979.

### Why are the changes needed?
To fix a bug.

### Does this PR introduce _any_ user-facing change?
Yes, some queries return the right results again.

### How was this patch tested?
Added new UT.

Closes #31955 from peter-toth/SPARK-34829-fix-scalaudf-resultconversion.

Authored-by: Peter Toth <peter.toth@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-03-28 10:01:09 -07:00
Yuming Wang 540f1fb1d9 [SPARK-32855][SQL][FOLLOWUP] Fix code format in SQLConf and comment in PartitionPruning
### What changes were proposed in this pull request?

Fix code format in `SQLConf` and comment in `PartitionPruning`.

### Why are the changes needed?

Make code more readable.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

N/A

Closes #31969 from wangyum/SPARK-32855-2.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-03-28 09:48:54 -07:00
“attilapiros” c8b7a09d39 [SPARK-34869][K8S][TEST] Extend "EXTRA LOGS FOR THE FAILED TEST" section of k8s integration test log with the describe pods output
### What changes were proposed in this pull request?

Extending "EXTRA LOGS FOR THE FAILED TEST" section of k8s integration test log with `kubectl describe pods` output for the failed test.

### Why are the changes needed?

PR builds frequently fails as the k8s integration tests are very flaky now in Amplab Jenkins environment.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Locally by making temporary one of the test fail. The output is:

```
21/03/25 16:55:16.722 ScalaTest-main-running-KubernetesSuite INFO KubernetesSuite:

===== EXTRA LOGS FOR THE FAILED TEST

21/03/25 16:55:17.167 ScalaTest-main-running-KubernetesSuite INFO KubernetesSuite: BEGIN driver DESCRIBE POD
Name:         spark-test-app-a2b03971b7c049e8a2629f6a3198842b
Namespace:    35bdb17e308743afaec17538f89a7c3e
Priority:     0
Node:         minikube/192.168.64.119
Start Time:   Thu, 25 Mar 2021 16:52:10 +0100
Labels:       spark-app-locator=75f695685ae44314a99ec13bb39332bc
              spark-app-selector=spark-150230742d364a77927a08eed0222065
              spark-role=driver
Annotations:  <none>
Status:       Succeeded
IP:           172.17.0.4
Containers:
  spark-kubernetes-driver:
    Container ID:  docker://d6d27b0551060d9b094f12d1e232dfb5ae78ce38559680c7126c548996da4d95
    Image:         docker.io/kubespark/spark:3.2.0-SNAPSHOT_9575B805-9CB0-4A16-8A31-AA2F8DDA8EE5
    Image ID:      docker://sha256:3fc556c73a0d5187b5a14dbdc2f69ef292e60b544b4b4d3715f6749417c20918
    Ports:         7078/TCP, 7079/TCP, 4040/TCP
    Host Ports:    0/TCP, 0/TCP, 0/TCP
    Args:
      driver
      --properties-file
      /opt/spark/conf/spark.properties
      --class
      org.apache.spark.examples.SparkPi
      local:///opt/spark/examples/jars/spark-examples_2.12-3.2.0-SNAPSHOT.jar
    State:          Terminated
      Reason:       Completed
      Exit Code:    0
      Started:      Thu, 25 Mar 2021 16:52:11 +0100
      Finished:     Thu, 25 Mar 2021 16:52:20 +0100
    Ready:          False
    Restart Count:  0
    Limits:
      memory:  1408Mi
    Requests:
      cpu:     1
      memory:  1408Mi
    Environment:
      SPARK_USER:                 attilazsoltpiros
      SPARK_APPLICATION_ID:       spark-150230742d364a77927a08eed0222065
      SPARK_DRIVER_BIND_ADDRESS:   (v1:status.podIP)
      SPARK_LOCAL_DIRS:           /var/data/spark-dab6f1c9-e538-40c8-a7d9-3e88f9b82cfa
      SPARK_CONF_DIR:             /opt/spark/conf
    Mounts:
      /opt/spark/conf from spark-conf-volume-driver (rw)
      /var/data/spark-dab6f1c9-e538-40c8-a7d9-3e88f9b82cfa from spark-local-dir-1 (rw)
      /var/run/secrets/kubernetes.io/serviceaccount from default-token-nmfwl (ro)
Conditions:
  Type              Status
  Initialized       True
  Ready             False
  ContainersReady   False
  PodScheduled      True
Volumes:
  spark-local-dir-1:
    Type:       EmptyDir (a temporary directory that shares a pod's lifetime)
    Medium:
    SizeLimit:  <unset>
  spark-conf-volume-driver:
    Type:      ConfigMap (a volume populated by a ConfigMap)
    Name:      spark-drv-c60832786a15ffbe-conf-map
    Optional:  false
  default-token-nmfwl:
    Type:        Secret (a volume populated by a Secret)
    SecretName:  default-token-nmfwl
    Optional:    false
QoS Class:       Burstable
Node-Selectors:  <none>
Tolerations:     node.kubernetes.io/not-ready:NoExecute for 300s
                 node.kubernetes.io/unreachable:NoExecute for 300s
Events:
  Type    Reason     Age   From               Message
  ----    ------     ----  ----               -------
  Normal  Scheduled  3m7s  default-scheduler  Successfully assigned 35bdb17e308743afaec17538f89a7c3e/spark-test-app-a2b03971b7c049e8a2629f6a3198842b to minikube
  Normal  Pulled     3m7s  kubelet, minikube  Container image "docker.io/kubespark/spark:3.2.0-SNAPSHOT_9575B805-9CB0-4A16-8A31-AA2F8DDA8EE5" already present on machine
  Normal  Created    3m7s  kubelet, minikube  Created container spark-kubernetes-driver
  Normal  Started    3m6s  kubelet, minikube  Started container spark-kubernetes-driver
21/03/25 16:55:17.168 ScalaTest-main-running-KubernetesSuite INFO KubernetesSuite: END driver DESCRIBE POD

```

Closes #31962 from attilapiros/SPARK-34869.

Authored-by: “attilapiros” <piros.attila.zsolt@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-03-28 09:44:56 -07:00
Angerszhuuuu 066c055b52 [SPARK-34092][SQL] Support Stage level restful api filter task details by task status
### What changes were proposed in this pull request?
When we want to get stage's detail info with task information, it will return all tasks, the content is huge and always we just want to know some failed tasks/running tasks  with whole stage info to judge is a task has some problem. This pr support
user to use
```
/application/[appid]/stages/[stage-id]?details=true&taskStatus=xxx
/application/[appid]/stages/[stage-id]/[stage-attempted-id]?details=true&taskStatus=xxx
```
to filter task details by task status

### Why are the changes needed?
More flexiable Restful API

### Does this PR introduce _any_ user-facing change?
User can use
```
/application/[appid]/stages/[stage-id]?details=true&taskStatus=xxx
/application/[appid]/stages/[stage-id]/[stage-attempted-id]?details=true&taskStatus=xxx
```
to filter task details by task status

### How was this patch tested?
Added

Closes #31165 from AngersZhuuuu/SPARK-34092.

Lead-authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Co-authored-by: angerszhu <angers.zhu@gmail.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
2021-03-27 16:26:07 -05:00
Dongjoon Hyun e7af44861e [SPARK-34880][SQL][TESTS] Add Parquet ZSTD compression test coverage
### What changes were proposed in this pull request?

Apache Parquet 1.12.0 switches its ZSTD compression from Hadoop codec to its own codec.

### Why are the changes needed?

**Apache Spark 3.1 (It requires libhadoop built with zstd)**
```scala
scala> spark.range(10).write.option("compression", "zstd").parquet("/tmp/a")
21/03/27 08:49:38 ERROR Executor: Exception in task 11.0 in stage 0.0 (TID 11)2]
java.lang.RuntimeException: native zStandard library not available:
this version of libhadoop was built without zstd support.
```

**Apache Spark 3.2 (No libhadoop requirement)**
```scala
scala> spark.range(10).write.option("compression", "zstd").parquet("/tmp/a")
```

### Does this PR introduce _any_ user-facing change?

Yes, this is an improvement.

### How was this patch tested?

Pass the CI with the newly added test coverage.

Closes #31981 from dongjoon-hyun/SPARK-34880.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-03-27 12:48:12 -07:00
Yuming Wang cbffc12f90 [SPARK-34542][BUILD] Upgrade Parquet to 1.12.0
### What changes were proposed in this pull request?

Parquet 1.12.0 New Feature
- PARQUET-41 - Add bloom filters to parquet statistics
- PARQUET-1373 - Encryption key management tools
- PARQUET-1396 - Example of using EncryptionPropertiesFactory and DecryptionPropertiesFactory
- PARQUET-1622 - Add BYTE_STREAM_SPLIT encoding
- PARQUET-1784 - Column-wise configuration
- PARQUET-1817 - Crypto Properties Factory
- PARQUET-1854 - Properties-Driven Interface to Parquet Encryption

Parquet 1.12.0 release notes:
https://github.com/apache/parquet-mr/blob/apache-parquet-1.12.0/CHANGES.md

### Why are the changes needed?

- Bloom filters to improve filter performance
- ZSTD enhancement

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing unit test.

Closes #31649 from wangyum/SPARK-34542.

Lead-authored-by: Yuming Wang <yumwang@ebay.com>
Co-authored-by: Yuming Wang <yumwang@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-03-27 07:56:29 -07:00
Angerszhuuuu 468b944b00 [SPARK-34841][SQL] Push ANSI interval binary expressions into into (if/else) branches
### What changes were proposed in this pull request?
Push ANSI interval binary expressions into into (if / case) branches

### Why are the changes needed?
Support more binary expression to push into if/else and casewhen

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added UT

Closes #31978 from AngersZhuuuu/SPARK-34841.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-03-27 14:50:28 +03:00
Angerszhuuuu 15bf01ef85 [SPARK-34848][SQL][FLLOW-UP] Fix merge conflict issue cause UT failed
### What changes were proposed in this pull request?
Fix issue of  https://github.com/apache/spark/pull/31948#issuecomment-808647121

### Why are the changes needed?
fix ut failed

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existed UT

Closes #31977 from AngersZhuuuu/SPARK-34848-followup.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-03-27 00:44:53 -07:00
Angerszhuuuu 769cf7b966 [SPARK-34744][SQL] Improve error message for casting cause overflow error
### What changes were proposed in this pull request?
Improve error message for casting cause overflow error. We should use DataType's catalogString.

### Why are the changes needed?
Improve error message

### Does this PR introduce _any_ user-facing change?
For example:
```
set spark.sql.ansi.enabled=true;
select tinyint(128) * tinyint(2);
```
Error message before this pr:
```
Casting 128 to scala.Byte$ causes overflow
```
After this pr:
```
Casting 128 to tinyint causes overflow
```

### How was this patch tested?
Added UT

Closes #31971 from AngersZhuuuu/SPARK-34744.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Kent Yao <yao@apache.org>
2021-03-27 11:15:55 +08:00
Josh Soref d58587b60d [SPARK-33717][LAUNCHER] deprecate spark.launcher.childConectionTimeout
### What changes were proposed in this pull request?
Deprecating `spark.launcher.childConectionTimeout` in favor of `spark.launcher.childConnectionTimeout`

### Why are the changes needed?
srowen suggested it https://github.com/apache/spark/pull/30323#discussion_r521449342

### How was this patch tested?
No testing. Not even compiled

Closes #30679 from jsoref/spelling-connection.

Authored-by: Josh Soref <jsoref@users.noreply.github.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
2021-03-26 15:53:52 -05:00
Angerszhuuuu 36a51044a4 [SPARK-34848][CORE] Add duration to TaskMetricDistributions
### What changes were proposed in this pull request?
Task duration distribution is also very important for us to judge whether a stage's task is skew enough.

### Why are the changes needed?
Add important information in TaskMetricsDistribution

### Does this PR introduce _any_ user-facing change?
People can see duration distribution from TaskMetricsDistribution

### How was this patch tested?
Existed UT

Closes #31948 from AngersZhuuuu/SPARK-34848.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
2021-03-26 12:32:20 -05:00
Ruifeng Zheng aa21e86464 [SPARK-34858][SPARK-34448][ML] Binary Logistic Regression with intercept support centering
### What changes were proposed in this pull request?
for binary lr, performance centering by subtracting the mean, if possible;

### Why are the changes needed?
for a feature with small std (i.e. 0.03), the scaled feature may have a large value (i.e. >30),
underlying solvers (**OWLQN/LBFGS/LBFGSB**) can not handle a feature with large values;

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
added testsuite

Closes #31693 from zhengruifeng/blr_center.

Authored-by: Ruifeng Zheng <ruifengz@foxmail.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
2021-03-26 12:28:51 -05:00
Max Gekk 9ba889b6ea [SPARK-34875][SQL] Support divide a day-time interval by a numeric
### What changes were proposed in this pull request?
1. Add new expression `DivideDTInterval` which multiplies a `DayTimeIntervalType` expression by a `NumericType` expression including ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, DecimalType.
2. Extend binary arithmetic rules to support `day-time interval / numeric`.

### Why are the changes needed?
To conform the ANSI SQL standard which requires such operation over day-time intervals:
<img width="656" alt="Screenshot 2021-03-25 at 18 44 58" src="https://user-images.githubusercontent.com/1580697/112501559-68f07080-8d9a-11eb-8781-66e6631bb7ef.png">

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By running new tests:
```
$ build/sbt "test:testOnly *IntervalExpressionsSuite"
$ build/sbt "test:testOnly *ColumnExpressionSuite"
```

Closes #31972 from MaxGekk/div-dt-interval-by-num.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-03-26 15:36:08 +00:00
HyukjinKwon c8233f1be5 [SPARK-34874][INFRA] Recover test reports for failed GA builds
### What changes were proposed in this pull request?

621becc6d7 (r48726074)
there was a behaviour change in the download artifact plugin and it disabled the test reporting in failed builds.

This PR recovers it by explicitly setting the conclusion from the workflow runs to search for the artifacts to download.

### Why are the changes needed?

In order to properly report the failed test cases.

### Does this PR introduce _any_ user-facing change?

No, it's dev only.

### How was this patch tested?

Manually tested at https://github.com/HyukjinKwon/spark/pull/30

Before:

![Screen Shot 2021-03-26 at 10 54 48 AM](https://user-images.githubusercontent.com/6477701/112566110-b7951d80-8e21-11eb-8fad-f637db9314d5.png)

After:

![Screen Shot 2021-03-26 at 5 04 01 PM](https://user-images.githubusercontent.com/6477701/112606215-7588cd80-8e5b-11eb-8fdd-3afebd629f4f.png)

Closes #31970 from HyukjinKwon/SPARK-34874.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2021-03-26 18:12:28 +09:00
Wenchen Fan 61d038f26e Revert "[SPARK-34701][SQL] Remove analyzing temp view again in CreateViewCommand"
This reverts commit da04f1f4f8.
2021-03-26 15:26:48 +08:00
Max Gekk f212c61c43 [SPARK-34868][SQL] Support divide an year-month interval by a numeric
### What changes were proposed in this pull request?
1. Add new expression `DivideYMInterval` which multiplies a `YearMonthIntervalType` expression by a `NumericType` expression including ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, DecimalType.
2. Extend binary arithmetic rules to support `year-month interval / numeric`.

### Why are the changes needed?
To conform the ANSI SQL standard which requires such operation over year-month intervals:
<img width="656" alt="Screenshot 2021-03-25 at 18 44 58" src="https://user-images.githubusercontent.com/1580697/112501559-68f07080-8d9a-11eb-8781-66e6631bb7ef.png">

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By running new tests:
```
$ build/sbt "test:testOnly *IntervalExpressionsSuite"
$ build/sbt "test:testOnly *ColumnExpressionSuite"
```

Closes #31961 from MaxGekk/div-ym-interval-by-num.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-03-26 05:56:56 +00:00
Yuming Wang aaa0d2a66b [SPARK-32855][SQL] Improve the cost model in pruningHasBenefit for filtering side can not build broadcast by join type
### What changes were proposed in this pull request?

This pr improve the cost model in `pruningHasBenefit` for filtering side can not build broadcast by join type:
1. The filtering side must be small enough to build broadcast by size.
2. The estimated size of the pruning side must be big enough: `estimatePruningSideSize * spark.sql.optimizer.dynamicPartitionPruning.pruningSideExtraFilterRatio > overhead`.

### Why are the changes needed?

Improve query performance for these cases.

This a real case from cluster. Left join and left size very small and right side can build DPP:
![image](https://user-images.githubusercontent.com/5399861/92882197-445a2a00-f442-11ea-955d-16a7724e535b.png)

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

Closes #29726 from wangyum/SPARK-32855.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-03-26 04:48:13 +00:00
Kent Yao 820b465886 [SPARK-34786][SQL] Read Parquet unsigned int64 logical type that stored as signed int64 physical type to decimal(20, 0)
### What changes were proposed in this pull request?

A companion PR for SPARK-34817, when we handle the unsigned int(<=32) logical types. In this PR, we map the unsigned int64 to decimal(20, 0) for better compatibility.

### Why are the changes needed?

Spark won't have unsigned types, but spark should be able to read existing parquet files written by other systems that support unsigned types for better compatibility.

### Does this PR introduce _any_ user-facing change?

yes, we can read parquet uint64 now

### How was this patch tested?

new unit tests

Closes #31960 from yaooqinn/SPARK-34786-2.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Kent Yao <yao@apache.org>
2021-03-26 09:54:19 +08:00
Yuanjian Li 5ffc3897e0 [SPARK-34871][SS] Move the checkpoint location resolving into the rule ResolveWriteToStream
### What changes were proposed in this pull request?
Move the checkpoint location resolving into the rule ResolveWriteToStream, which is added in SPARK-34748.

### Why are the changes needed?
After SPARK-34748, we have a rule ResolveWriteToStream for the analysis logic for the resolving logic of stream write plans. Based on it, we can further move the checkpoint location resolving work in the rule. Then, all the checkpoint resolving logic was done in the analyzer.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing UT.

Closes #31963 from xuanyuanking/SPARK-34871.

Authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2021-03-26 10:29:50 +09:00
Wenchen Fan 658e95c345 [SPARK-34833][SQL][FOLLOWUP] Handle outer references in all the places
### What changes were proposed in this pull request?

This is a follow-up of https://github.com/apache/spark/pull/31940 . This PR generalizes the matching of attributes and outer references, so that outer references are handled everywhere.

Note that, currently correlated subquery has a lot of limitations in Spark, and the newly covered cases are not possible to happen. So this PR is a code refactor.

### Why are the changes needed?

code cleanup

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing tests

Closes #31959 from cloud-fan/follow.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2021-03-26 09:10:03 +09:00
Chandni Singh 6d88212f79 [SPARK-34840][SHUFFLE] Fixes cases of corruption in merged shuffle …
### What changes were proposed in this pull request?
This PR fixes bugs that causes corruption of push-merged blocks when a client terminates while pushing block. `RemoteBlockPushResolver` was introduced in #30062 (SPARK-32916).

There are 2 scenarios where the merged blocks get corrupted:
1. `StreamCallback.onFailure()` is called more than once. Initially we assumed that the onFailure callback will be called just once per stream. However, we observed that this is called twice when a client connection is reset. When the client connection is reset then there are 2 events that get triggered in this order.
 - `exceptionCaught`. This event is propagated to `StreamInterceptor`. `StreamInterceptor.exceptionCaught()` invokes `callback.onFailure(streamId, cause)`. This is the first time StreamCallback.onFailure() will be invoked.
 - `channelInactive`. Since the channel closes, the `channelInactive` event gets triggered which again is propagated to `StreamInterceptor`. `StreamInterceptor.channelInactive()` invokes `callback.onFailure(streamId, new ClosedChannelException())`. This is the second time  StreamCallback.onFailure() will be invoked.

2. The flag `isWriting` is set prematurely to true. This introduces an edge case where a stream that is trying to merge a duplicate block (created because of a speculative task) may interfere with an active stream if the duplicate stream fails.

Also adding additional changes that improve the code.

1.  Using positional writes all the time because this simplifies the code and with microbenchmarking haven't seen any performance impact.
2. Additional minor changes suggested by mridulm during an internal review.

### Why are the changes needed?
These are bug fixes and simplify the code.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added unit tests. I have also tested these changes in Linkedin's internal fork on a cluster.

Co-authored-by: Chandni Singh chsinghlinkedin.com
Co-authored-by: Min Shen mshenlinkedin.com

Closes #31934 from otterc/SPARK-32916-followup.

Lead-authored-by: Chandni Singh <singh.chandni@gmail.com>
Co-authored-by: Min Shen <mshen@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
2021-03-25 12:47:46 -05:00
Gengliang Wang 0515f49018 [SPARK-34856][SQL] ANSI mode: Allow casting complex types as string type
### What changes were proposed in this pull request?

Allow casting complex types as string type in ANSI mode.

### Why are the changes needed?

Currently, complex types are not allowed to cast as string type. This breaks the DataFrame.show() API. E.g
```
scala> sql(“select array(1, 2, 2)“).show(false)
org.apache.spark.sql.AnalysisException: cannot resolve ‘CAST(`array(1, 2, 2)` AS STRING)’ due to data type mismatch:
 cannot cast array<int> to string with ANSI mode on.
```
We should allow the conversion as the extension of the ANSI SQL standard, so that the DataFrame.show() still work in ANSI mode.
### Does this PR introduce _any_ user-facing change?

Yes, casting complex types as string type is now allowed in ANSI mode.

### How was this patch tested?

Unit tests.

Closes #31954 from gengliangwang/fixExplicitCast.

Authored-by: Gengliang Wang <ltnwgl@gmail.com>
Signed-off-by: Gengliang Wang <ltnwgl@gmail.com>
2021-03-26 00:17:43 +08:00
Karen Feng 0d91f9c3f3 [SPARK-33600][SQL] Group exception messages in execution/datasources/v2
### What changes were proposed in this pull request?

This PR groups exception messages in `execution/datasources/v2`.

### Why are the changes needed?

It will largely help with standardization of error messages and its maintenance.

### Does this PR introduce _any_ user-facing change?

No. Error messages remain unchanged.

### How was this patch tested?

No new tests - pass all original tests to make sure it doesn't break any existing behavior.

Closes #31619 from karenfeng/spark-33600.

Authored-by: Karen Feng <karen.feng@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-03-25 16:15:30 +00:00
Tanel Kiis 6ba8445ea3 [SPARK-34822][SQL] Update the plan stability golden files even if only the explain.txt changes
### What changes were proposed in this pull request?

Update the plan stability golden files even if only the `explain.txt` changes.

This is resubmition of #31927. The schema for one of the TPCDS tables was updated and that changed the `explain.txt` for the q17.

### Why are the changes needed?

Currently only `simplified.txt` change is checked. There are some PRs, that update the `explain.txt`, that do not change the `simplified.txt`.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

The updated golden files.

Closes #31957 from tanelk/SPARK-34822_update_plan_stability.

Lead-authored-by: Tanel Kiis <tanel.kiis@gmail.com>
Co-authored-by: tanel.kiis@gmail.com <tanel.kiis@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-03-25 10:22:49 +00:00
Tim Armstrong 1d6acd584a [SPARK-34857][SQL] Correct AtLeastNNonNulls's explain output
### What changes were proposed in this pull request?
Removed the custom toString implementation of AtLeastNNoneNulls.

### Why are the changes needed?
It shows up wrong in the explain plan. The name of the function is wrong and the actual value of the first argument is not shown. Both of these would make it easier to understand the plan.

```
(12) Filter
Input [3]: [c1#2410L, c2#2419, c3#2422]
Condition : AtLeastNNulls(n, c1#2410L)
```

### Does this PR introduce _any_ user-facing change?
Only the explain plan changes if this function is used.

### How was this patch tested?
Added a simple unit test to make sure that the toString output is correct.

Closes #31956 from timarmstrong/atleastnnonnulls.

Authored-by: Tim Armstrong <tim.armstrong@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2021-03-25 17:20:01 +09:00
Max Gekk a68d7ca8c5 [SPARK-34850][SQL] Support multiply a day-time interval by a numeric
### What changes were proposed in this pull request?
1. Add new expression `MultiplyDTInterval` which multiplies a `DayTimeIntervalType` expression by a `NumericType` expression including ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, DecimalType.
2. Extend binary arithmetic rules to support `numeric * day-time interval` and `day-time interval * numeric`.
3. Invoke `DoubleMath.roundToInt` in `double/float * year-month interval`.

### Why are the changes needed?
To conform the ANSI SQL standard which requires such operation over day-time intervals:
<img width="667" alt="Screenshot 2021-03-22 at 16 33 16" src="https://user-images.githubusercontent.com/1580697/111997810-77d1eb80-8b2c-11eb-951d-e43911d9c5db.png">

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By running new tests:
```
$ build/sbt "test:testOnly *IntervalExpressionsSuite"
$ build/sbt "test:testOnly *ColumnExpressionSuite"
```

Closes #31951 from MaxGekk/mul-day-time-interval.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-03-25 10:46:50 +03:00
Kent Yao 8c6748f691 [SPARK-34817][SQL] Read parquet unsigned types that stored as int32 physical type in parquet
### What changes were proposed in this pull request?

Unsigned types may be used to produce smaller in-memory representations of the data. These types used by frameworks(e.g. hive, pig) using parquet. And parquet will map them to its base types.

see more https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift

```thrift
  /**
   * An unsigned integer value.
   *
   * The number describes the maximum number of meaningful data bits in
   * the stored value. 8, 16 and 32 bit values are stored using the
   * INT32 physical type.  64 bit values are stored using the INT64
   * physical type.
   *
   */
  UINT_8 = 11;
  UINT_16 = 12;
  UINT_32 = 13;
  UINT_64 = 14;
```

```
UInt8-[0:255]
UInt16-[0:65535]
UInt32-[0:4294967295]
UInt64-[0:18446744073709551615]
```

In this PR, we support read UINT_8 as ShortType, UINT_16 as IntegerType, UINT_32 as LongType to fit their range. Support for UINT_64 will be in another PR.

### Why are the changes needed?

better parquet support

### Does this PR introduce _any_ user-facing change?

yes, we can read unit[8/16/32] from parquet files

### How was this patch tested?

new tests

Closes #31921 from yaooqinn/SPARK-34817.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-03-25 06:58:06 +00:00
Terry Kim da04f1f4f8 [SPARK-34701][SQL] Remove analyzing temp view again in CreateViewCommand
### What changes were proposed in this pull request?

This PR proposes to remove re-analyzing the already analyzed plan for `CreateViewCommand` as discussed https://github.com/apache/spark/pull/31273/files#r581592786.

### Why are the changes needed?

No need to analyze the plan if it's already analyzed.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests should cover this.

Closes #31933 from imback82/remove_analyzed_from_create_temp_view.

Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-03-25 06:53:59 +00:00
HyukjinKwon 7838f55ca7 Revert "[SPARK-34822][SQL] Update the plan stability golden files even if only the explain.txt changes"
This reverts commit 84df54b495.
2021-03-25 12:31:08 +09:00
ulysses-you 9d561e6b5e [SPARK-34852][SQL] Close Hive session state should use withHiveState
### What changes were proposed in this pull request?

Wrap Hive sessionStae `close` with `withHiveState`

### Why are the changes needed?

Some reason:

1. Shutdown hook is invoked using different thread
2. Hive may use metasotre client again during closing

Otherwise, we may get such expcetion with custom hive metastore version
```
21/03/24 13:26:18 INFO session.SessionState: Failed to remove classloaders from DataNucleus
java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
	at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1654)
	at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.<init>(RetryingMetaStoreClient.java:80)
	at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:130)
	at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:101)
	at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:3367)
	at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3406)
	at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3386)
	at org.apache.hadoop.hive.ql.session.SessionState.unCacheDataNucleusClassLoaders(SessionState.java:1546)
	at org.apache.hadoop.hive.ql.session.SessionState.close(SessionState.java:1536)
	at org.apache.spark.sql.hive.client.HiveClientImpl.closeState(HiveClientImpl.scala:172)
	at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$new$1(HiveClientImpl.scala:175)
	at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:214)
	at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$2(ShutdownHookManager.scala:188)
```

### Does this PR introduce _any_ user-facing change?

No, since this not released.

### How was this patch tested?

manual test.

Closes #31949 from ulysses-you/SPARK-34852.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Kent Yao <yao@apache.org>
2021-03-25 10:21:44 +08:00
Takeshi Yamamuro 150769bced [SPARK-34833][SQL] Apply right-padding correctly for correlated subqueries
### What changes were proposed in this pull request?

This PR intends to fix the bug that does not apply right-padding for char types inside correlated subquries.
For example,  a query below returns nothing in master, but a correct result is `c`.
```
scala> sql(s"CREATE TABLE t1(v VARCHAR(3), c CHAR(5)) USING parquet")
scala> sql(s"CREATE TABLE t2(v VARCHAR(5), c CHAR(7)) USING parquet")
scala> sql("INSERT INTO t1 VALUES ('c', 'b')")
scala> sql("INSERT INTO t2 VALUES ('a', 'b')")
scala> val df = sql("""
  |SELECT v FROM t1
  |WHERE 'a' IN (SELECT v FROM t2 WHERE t2.c = t1.c )""".stripMargin)

scala> df.show()
+---+
|  v|
+---+
+---+

```

This is because `ApplyCharTypePadding`  does not handle the case above to apply right-padding into `'abc'`. This PR modifies the code in `ApplyCharTypePadding` for handling it correctly.

```
// Before this PR:
scala> df.explain(true)
== Analyzed Logical Plan ==
v: string
Project [v#13]
+- Filter a IN (list#12 [c#14])
   :  +- Project [v#15]
   :     +- Filter (c#16 = outer(c#14))
   :        +- SubqueryAlias spark_catalog.default.t2
   :           +- Relation default.t2[v#15,c#16] parquet
   +- SubqueryAlias spark_catalog.default.t1
      +- Relation default.t1[v#13,c#14] parquet

scala> df.show()
+---+
|  v|
+---+
+---+

// After this PR:
scala> df.explain(true)
== Analyzed Logical Plan ==
v: string
Project [v#43]
+- Filter a IN (list#42 [c#44])
   :  +- Project [v#45]
   :     +- Filter (c#46 = rpad(outer(c#44), 7,  ))
   :        +- SubqueryAlias spark_catalog.default.t2
   :           +- Relation default.t2[v#45,c#46] parquet
   +- SubqueryAlias spark_catalog.default.t1
      +- Relation default.t1[v#43,c#44] parquet

scala> df.show()
+---+
|  v|
+---+
|  c|
+---+
```

This fix is lated to TPCDS q17; the query returns nothing because of this bug: https://github.com/apache/spark/pull/31886/files#r599333799

### Why are the changes needed?

Bugfix.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit tests added.

Closes #31940 from maropu/FixCharPadding.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2021-03-25 08:31:57 +09:00
Ruifeng Zheng 88cf86f56b [SPARK-34797][ML] Refactor Logistic Aggregator - support virtual centering
### What changes were proposed in this pull request?

1, add `BinaryLogisticBlockAggregator` and `MultinomialLogisticBlockAggregator` and related testsuites;
2, impl `virtual centering` in standardization;
3, remove old `LogisticAggregator`

### Why are the changes needed?
previous [pr](https://github.com/apache/spark/pull/31693) and related works is too large, we need to split it into 3 prs:
1, this one, impl new agg supporting `virtual centering`, remove old `LogisticAggregator`;
2, adopt new blor-agg in lor,  add new test suite for small var features;
3, adopt new mlor-agg in lor,  add new test suite for small var features, remove `BlockLogisticAggregator`;

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
added testsuites

Closes #31889 from zhengruifeng/blor_mlor_agg.

Authored-by: Ruifeng Zheng <ruifengz@foxmail.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
2021-03-24 12:16:21 -05:00
Gengliang Wang abfd9b23cd [SPARK-34769][SQL] AnsiTypeCoercion: return closest convertible type among TypeCollection
### What changes were proposed in this pull request?

Currently, when implicit casting a data type to a `TypeCollection`, Spark returns the first convertible data type among `TypeCollection`.
In ANSI mode, we can make the behavior more reasonable by returning the closet convertible data type in `TypeCollection`.

In details, we first try to find the all the expected types we can implicitly cast:
1. if there is no convertible data types, return None;
2. if there is only one convertible data type, cast input as it;
3. otherwise if there are multiple convertible data types, find the closet data
type among them. If there is no such closet data type, return None.

Note that if the closet type is Float type and the convertible types contains Double type, simply return Double type as the closet type to avoid potential
precision loss on converting the Integral type as Float type.

### Why are the changes needed?

Make the type coercion rule for TypeCollection more reasonable and ANSI compatible.
E.g. returning Long instead of Double for`implicast(int, TypeCollect(Double, Long))`.

From ANSI SQL Spec section 4.33 "SQL-invoked routines"
![Screen Shot 2021-03-17 at 4 05 06 PM](https://user-images.githubusercontent.com/1097932/111434916-5e104e80-86bd-11eb-8b3b-33090a68067d.png)

Section 9.6 "Subject routine determination"
![Screen Shot 2021-03-17 at 1 36 55 PM](https://user-images.githubusercontent.com/1097932/111420336-48445e80-86a8-11eb-9d50-34b325043bdb.png)

Section 10.4 "routine invocation"
![Screen Shot 2021-03-17 at 4 08 41 PM](https://user-images.githubusercontent.com/1097932/111434926-610b3f00-86bd-11eb-8c32-8c7935e055da.png)

### Does this PR introduce _any_ user-facing change?

Yes, in ANSI mode, implicit casting to a `TypeCollection` returns the narrowest convertible data type instead of the first convertible one.

### How was this patch tested?

Unit tests.

Closes #31859 from gengliangwang/implicitCastTypeCollection.

Lead-authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Co-authored-by: Gengliang Wang <ltnwgl@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-03-24 15:04:03 +00:00
Tanel Kiis 84df54b495 [SPARK-34822][SQL] Update the plan stability golden files even if only the explain.txt changes
### What changes were proposed in this pull request?

Update the plan stability golden files even if only the `explain.txt` changes.

### Why are the changes needed?

Currently only `simplified.txt` change is checked. There are some PRs, that update the `explain.txt`, that do not change the `simplified.txt`.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

The updated golden files.

Closes #31927 from tanelk/SPARK-34822_update_plan_stability.

Lead-authored-by: Tanel Kiis <tanel.kiis@gmail.com>
Co-authored-by: tanel.kiis@gmail.com <tanel.kiis@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-03-24 14:36:51 +00:00
Danny Meijer ad211ccd9d
[SPARK-34630][PYTHON][SQL] Added typehint for pyspark.sql.Column.contains
### What changes were proposed in this pull request?

This PR implements the missing typehints as per SPARK-34630.

### Why are the changes needed?

To satisfy the aforementioned Jira ticket

### Does this PR introduce _any_ user-facing change?

No, just adding a missing typehint for Project Zen

### How was this patch tested?

No tests needed (just adding a typehint)

Closes #31823 from dannymeijer/feature/SPARK-34630.

Authored-by: Danny Meijer <danny.meijer@nike.com>
Signed-off-by: zero323 <mszymkiewicz@gmail.com>
2021-03-24 15:21:19 +01:00
Cheng Su 35c70e417d [SPARK-34853][SQL] Remove duplicated definition of output partitioning/ordering for limit operator
### What changes were proposed in this pull request?

Both local limit and global limit define the output partitioning and output ordering in the same way and this is duplicated (https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala#L159-L175 ). We can move the output partitioning and ordering into their parent trait - `BaseLimitExec`. This is doable as `BaseLimitExec` has no more other child class. This is a minor code refactoring.

### Why are the changes needed?

Clean up the code a little bit. Better readability.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pure refactoring. Rely on existing unit tests.

Closes #31950 from c21/limit-cleanup.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2021-03-24 23:06:35 +09:00
Angerszhuuuu 8ed5808f64 [SPARK-34488][CORE] Support task Metrics Distributions and executor Metrics Distributions in the REST API call for a specified stage
### What changes were proposed in this pull request?
For a specific stage, it is useful to show the task metrics in percentile distribution.  This information can help users know whether or not there is a skew/bottleneck among tasks in a given stage.  We list an example in taskMetricsDistributions.json

Similarly, it is useful to show the executor metrics in percentile distribution for a specific stage. This information can show whether or not there is a skewed load on some executors.  We list an example in executorMetricsDistributions.json

We define `withSummaries` and `quantiles` query parameter in the REST API for a specific stage as:

applications/<application_id>/<application_attempt/stages/<stage_id>/<stage_attempt>?withSummaries=[true|false]& quantiles=0.05,0.25,0.5,0.75,0.95

1. withSummaries: default is false, define whether to show current stage's taskMetricsDistribution and executorMetricsDistribution
2. quantiles: default is `0.0,0.25,0.5,0.75,1.0` only effect when `withSummaries=true`, it define the quantiles we use when calculating metrics distributions.

When withSummaries=true, both task metrics in percentile distribution and executor metrics in percentile distribution are included in the REST API output.  The default value of withSummaries is false, i.e. no metrics percentile distribution will be included in the REST API output.

 

### Why are the changes needed?
For a specific stage, it is useful to show the task metrics in percentile distribution.  This information can help users know whether or not there is a skew/bottleneck among tasks in a given stage.  We list an example in taskMetricsDistributions.json

### Does this PR introduce _any_ user-facing change?
User can  use  below restful API to get task metrics distribution and executor metrics distribution for indivial stage
```
applications/<application_id>/<application_attempt/stages/<stage_id>/<stage_attempt>?withSummaries=[true|false]
```

### How was this patch tested?
Added UT

Closes #31611 from AngersZhuuuu/SPARK-34488.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
2021-03-24 08:50:45 -05:00