### What changes were proposed in this pull request?
The main code change is:
* Change rule `DemoteBroadcastHashJoin` to `DynamicJoinSelection` and add shuffle hash join selection code.
* Specify a join strategy hint `SHUFFLE_HASH` if AQE think a join can be converted to SHJ.
* Skip `preferSortMerge` config check in AQE side if a join can be converted to SHJ.
### Why are the changes needed?
Use AQE runtime statistics to decide if we can use shuffled hash join instead of sort merge join. Currently, the formula of shuffled hash join selection dose not work due to the dymanic shuffle partition number.
Add a new config spark.sql.adaptive.shuffledHashJoinLocalMapThreshold to decide if join can be converted to shuffled hash join safely.
### Does this PR introduce _any_ user-facing change?
Yes, add a new config.
### How was this patch tested?
Add test.
Closes#32550 from ulysses-you/SPARK-35282-2.
Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Add the metrics to record how many tasks fallback to sort-based aggregation for hash aggregation. This will help developers and users to debug and optimize query. Object hash aggregation has similar metrics already.
### Why are the changes needed?
Help developers and users to debug and optimize query with hash aggregation.
### Does this PR introduce _any_ user-facing change?
Yes, the added metrics will show up in Spark web UI.
Example:
<img width="604" alt="Screen Shot 2021-05-26 at 12 17 08 AM" src="https://user-images.githubusercontent.com/4629931/119618437-bf3c5880-bdb7-11eb-89bb-5b88db78639f.png">
### How was this patch tested?
Changed unit test in `SQLMetricsSuite.scala`.
Closes#32671 from c21/agg-metrics.
Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This PR fixes `HiveExternalCatalogVersionsSuite`.
With this change, only <major>.<minor> version is set to `spark.sql.hive.metastore.version`.
### Why are the changes needed?
I'm personally checking whether all the tests pass with Java 11 for the current `master` and I found `HiveExternalCatalogVersionsSuite` fails.
The reason is that Spark 3.0.2 and 3.1.1 doesn't accept `2.3.8` as a hive metastore version.
`HiveExternalCatalogVersionsSuite` downloads Spark releases from https://dist.apache.org/repos/dist/release/spark/ and run test for each release. The Spark releases are `3.0.2` and `3.1.1` for the current `master` for now.
e47e615c0e/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala (L239-L259)
With Java 11, the suite run with a hive metastore version which corresponds to the builtin Hive version and it's `2.3.8` for the current `master`.
20750a3f9e/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala (L62-L66)
But `branch-3.0` and `branch-3.1` doesn't accept `2.3.8`, the suite with Java 11 fails.
Another solution would be backporting SPARK-34271 (#31371) but after [a discussion](https://github.com/apache/spark/pull/32668#issuecomment-848435170), we prefer to fix the test,
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests with CI.
Closes#32670 from sarutak/fix-version-suite-for-java11.
Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This PR aims to upgrade json4s from 3.7.0-M5 to 3.7.0-M11
Note: json4s version greater than 3.7.0-M11 is not binary compatible with Spark third party jars
### Why are the changes needed?
Multiple defect fixes and improvements like
https://github.com/json4s/json4s/issues/750https://github.com/json4s/json4s/issues/554https://github.com/json4s/json4s/issues/715
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Ran with the existing UTs
Closes#32636 from vinodkc/br_build_upgrade_json4s.
Authored-by: Vinod KC <vinod.kc.in@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
### What changes were proposed in this pull request?
Add the function type, such as "scala_udf", "python_udf", "java_udf", "hive", "built-in" to the `ExpressionInfo` for UDF.
### Why are the changes needed?
Make the `ExpressionInfo` of UDF more meaningful
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
existing and newly added UT
Closes#32587 from linhongliu-db/udf-language.
Authored-by: Linhong Liu <linhong.liu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This PR proposes to use a proper built-in exceptions instead of the plain `Exception` in Python.
While I am here, I fixed another minor issue at `DataFrams.schema` together:
```diff
- except AttributeError as e:
- raise Exception(
- "Unable to parse datatype from schema. %s" % e)
+ except Exception as e:
+ raise ValueError(
+ "Unable to parse datatype from schema. %s" % e) from e
```
Now it catches all exceptions during schema parsing, chains the exception with `ValueError`. Previously it only caught `AttributeError` that does not catch all cases.
### Why are the changes needed?
For users to expect the proper exceptions.
### Does this PR introduce _any_ user-facing change?
Yeah, the exception classes became different but should be compatible because previous exception was plain `Exception` which other exceptions inherit.
### How was this patch tested?
Existing unittests should cover,
Closes#31238Closes#32650 from HyukjinKwon/SPARK-32194.
Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This PR improves the interaction between partition coalescing and skew handling by moving the skew join rule ahead of the partition coalescing rule and making corresponding changes to the two rules:
1. Simplify `OptimizeSkewedJoin` as it doesn't need to handle `CustomShuffleReaderExec` anymore.
2. Update `CoalesceShufflePartitions` to support coalescing non-skewed partitions.
### Why are the changes needed?
It's a bit hard to reason about skew join if partitions have been coalesced. A skewed partition needs to be much larger than other partitions and we need to look at the raw sizes before coalescing.
It also makes `OptimizeSkewedJoin` more robust, as we don't need to worry about a skewed partition being coalesced with a small partition and breaks skew join handling.
It also helps with https://github.com/apache/spark/pull/31653 , which needs to move `OptimizeSkewedJoin` to an earlier phase and run before `CoalesceShufflePartitions`.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
new UT and existing tests
Closes#32594 from cloud-fan/shuffle.
Lead-authored-by: Wenchen Fan <wenchen@databricks.com>
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
* remove `EliminateUnnecessaryJoin`, using `AQEPropagateEmptyRelation` instead.
* eliminate join, aggregate, limit, repartition, sort, generate which is beneficial.
### Why are the changes needed?
Make `EliminateUnnecessaryJoin` available with more case.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Add test.
Closes#32602 from ulysses-you/SPARK-35455.
Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Addressed the dongjoon-hyun comments on the previous PR #30018.
Extended the `RemoveRedundantAggregates` rule to remove redundant aggregations in even more queries. For example in
```
dataset
.dropDuplicates()
.groupBy('a)
.agg(max('b))
```
the `dropDuplicates` is not needed, because the result on `max` does not depend on duplicate values.
### Why are the changes needed?
Improve performance.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT
Closes#31914 from tanelk/SPARK-33122_redundant_aggs_followup.
Lead-authored-by: tanel.kiis@gmail.com <tanel.kiis@gmail.com>
Co-authored-by: Tanel Kiis <tanel.kiis@gmail.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
### What changes were proposed in this pull request?
This PR fixes an issue that `RemoveRedundantProjects` removes `ProjectExec` which is for generating `UnsafeRow`.
In `DataSourceV2Strategy`, `ProjectExec` will be inserted to ensure internal rows are `UnsafeRow`.
```
private def withProjectAndFilter(
project: Seq[NamedExpression],
filters: Seq[Expression],
scan: LeafExecNode,
needsUnsafeConversion: Boolean): SparkPlan = {
val filterCondition = filters.reduceLeftOption(And)
val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan)
if (withFilter.output != project || needsUnsafeConversion) {
ProjectExec(project, withFilter)
} else {
withFilter
}
}
...
case PhysicalOperation(project, filters, relation: DataSourceV2ScanRelation) =>
// projection and filters were already pushed down in the optimizer.
// this uses PhysicalOperation to get the projection and ensure that if the batch scan does
// not support columnar, a projection is added to convert the rows to UnsafeRow.
val batchExec = BatchScanExec(relation.output, relation.scan)
withProjectAndFilter(project, filters, batchExec, !batchExec.supportsColumnar) :: Nil
```
So, the hierarchy of the partial tree should be like `ProjectExec(FilterExec(BatchScan))`.
But `RemoveRedundantProjects` doesn't consider this type of hierarchy, leading `ClassCastException`.
A concreate example to reproduce this issue is reported:
```
import scala.collection.JavaConverters._
import org.apache.iceberg.{PartitionSpec, TableProperties}
import org.apache.iceberg.hadoop.HadoopTables
import org.apache.iceberg.spark.SparkSchemaUtil
import org.apache.spark.sql.{DataFrame, QueryTest, SparkSession}
import org.apache.spark.sql.internal.SQLConf
class RemoveRedundantProjectsTest extends QueryTest {
override val spark: SparkSession = SparkSession
.builder()
.master("local[4]")
.config("spark.driver.bindAddress", "127.0.0.1")
.appName(suiteName)
.getOrCreate()
test("RemoveRedundantProjects removes non-redundant projects") {
withSQLConf(
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
SQLConf.REMOVE_REDUNDANT_PROJECTS_ENABLED.key -> "true") {
withTempDir { dir =>
val path = dir.getCanonicalPath
val data = spark.range(3).toDF
val table = new HadoopTables().create(
SparkSchemaUtil.convert(data.schema),
PartitionSpec.unpartitioned(),
Map(TableProperties.WRITE_NEW_DATA_LOCATION -> path).asJava,
path)
data.write.format("iceberg").mode("overwrite").save(path)
table.refresh()
val df = spark.read.format("iceberg").load(path)
val dfX = df.as("x")
val dfY = df.as("y")
val join = dfX.filter(dfX("id") > 0).join(dfY, "id")
join.explain("extended")
assert(join.count() == 2)
}
}
}
}
```
```
[info] - RemoveRedundantProjects removes non-redundant projects *** FAILED ***
[info] org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 4) (xeroxms100.northamerica.corp.microsoft.com executor driver): java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast to org.apache.spark.sql.catalyst.expressions.UnsafeRow
[info] at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:226)
[info] at org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
```
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New test.
Closes#32606 from sarutak/fix-project-removal-issue.
Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Use `SpecificInternalRow` instead of `GenericInternalRow` to avoid boxing / unboxing cost.
### Why are the changes needed?
Since it doesn't know the input row schema, `GenericInternalRow` potentially need to apply boxing for input arguments. It's better to use `SpecificInternalRow` instead since we know input data types.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
Closes#32647 from sunchao/specific-input-row.
Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This PR fixes a bug with subexpression elimination for CaseWhen statements. https://github.com/apache/spark/pull/30245 added support for creating subexpressions that are present in all branches of conditional statements. However, for a statement to be in "all branches" of a CaseWhen statement, it must also be in the elseValue.
### Why are the changes needed?
Fix a bug where a subexpression can be created and run for branches of a conditional that don't pass. This can cause issues especially with a UDF in a branch that gets executed assuming the condition is true.
### Does this PR introduce _any_ user-facing change?
Yes, fixes a potential bug where a UDF could be eagerly executed even though it might expect to have already passed some form of validation. For example:
```
val col = when($"id" < 0, myUdf($"id"))
spark.range(1).select(when(col > 0, col)).show()
```
`myUdf($"id")` is considered a subexpression and eagerly evaluated, because it is pulled out as a common expression from both executions of the when clause, but if `id >= 0` it should never actually be run.
### How was this patch tested?
Updated existing test with new case.
Closes#32595 from Kimahriman/bug-case-subexpr-elimination.
Authored-by: Adam Binford <adamq43@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
### What changes were proposed in this pull request?
This patch fixes a bug when dealing with common expressions in conditional expressions such as `CaseWhen` during subexpression elimination.
For example, previously we find common expressions among conditions of `CaseWhen`, but children expressions are also counted into. We should not count these children expressions as common expressions.
### Why are the changes needed?
If the redundant children expressions are counted as common expressions too, they will be redundantly evaluated and miss the subexpression elimination opportunity.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added tests.
Closes#32559 from viirya/SPARK-35410.
Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
This PR proposes to avoid wrapping if-else to the constant literals for `percentage` and `accuracy` in `percentile_approx`. They are expected to be literals (or foldable expressions).
Pivot works by two phrase aggregations, and it works with manipulating the input to `null` for non-matched values (pivot column and value).
Note that pivot supports an optimized version without such logic with changing input to `null` for some types (non-nested types basically). So the issue fixed by this PR is only for complex types.
```scala
val df = Seq(
("a", -1.0), ("a", 5.5), ("a", 2.5), ("b", 3.0), ("b", 5.2)).toDF("type", "value")
.groupBy().pivot("type", Seq("a", "b")).agg(
percentile_approx(col("value"), array(lit(0.5)), lit(10000)))
df.show()
```
**Before:**
```
org.apache.spark.sql.AnalysisException: cannot resolve 'percentile_approx((IF((type <=> CAST('a' AS STRING)), value, CAST(NULL AS DOUBLE))), (IF((type <=> CAST('a' AS STRING)), array(0.5D), NULL)), (IF((type <=> CAST('a' AS STRING)), 10000, CAST(NULL AS INT))))' due to data type mismatch: The accuracy or percentage provided must be a constant literal;
'Aggregate [percentile_approx(if ((type#7 <=> cast(a as string))) value#8 else cast(null as double), if ((type#7 <=> cast(a as string))) array(0.5) else cast(null as array<double>), if ((type#7 <=> cast(a as string))) 10000 else cast(null as int), 0, 0) AS a#16, percentile_approx(if ((type#7 <=> cast(b as string))) value#8 else cast(null as double), if ((type#7 <=> cast(b as string))) array(0.5) else cast(null as array<double>), if ((type#7 <=> cast(b as string))) 10000 else cast(null as int), 0, 0) AS b#18]
+- Project [_1#2 AS type#7, _2#3 AS value#8]
+- LocalRelation [_1#2, _2#3]
```
**After:**
```
+-----+-----+
| a| b|
+-----+-----+
|[2.5]|[3.0]|
+-----+-----+
```
### Why are the changes needed?
To make percentile_approx work with pivot as expected
### Does this PR introduce _any_ user-facing change?
Yes. It threw an exception but now it returns a correct result as shown above.
### How was this patch tested?
Manually tested and unit test was added.
Closes#32619 from HyukjinKwon/SPARK-35480.
Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This is a followup of https://github.com/apache/spark/pull/32411, to fix a mistake and use `sparkSession.sessionState.newHadoopConf` which includes SQL configs instead of `sparkSession.sparkContext.hadoopConfiguration` .
### Why are the changes needed?
fix mistake
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
existing tests
Closes#32618 from cloud-fan/follow1.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Kent Yao <yao@apache.org>
### What changes were proposed in this pull request?
This patch sorts equivalent expressions based on their child-parent relation.
### Why are the changes needed?
`EquivalentExpressions` maintains a map of equivalent expressions. It is `HashMap` now so the insertion order is not guaranteed to be preserved later. Subexpression elimination relies on retrieving subexpressions from the map. If there is child-parent relationships among the subexpressions, we want the child expressions come first than parent expressions, so we can replace child expressions in parent expressions with subexpression evaluation.
For example, we have two different expressions `Add(Literal(1), Literal(2))` and `Add(Literal(3), add)`.
Case 1: child subexpr comes first.
```scala
addExprTree(add)
addExprTree(Add(Literal(3), add))
addExprTree(Add(Literal(3), add))
```
Case 2: parent subexpr comes first. For this case, we need to sort equivalent expressions.
```
addExprTree(Add(Literal(3), add)) => We add `Add(Literal(3), add)` into the map first, then add `add` into the map
addExprTree(add)
addExprTree(Add(Literal(3), add))
```
As we are going to sort equivalent expressions at all, we don't need `LinkedHashMap` but just do sorting.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added tests.
Closes#32586 from viirya/use-listhashmap.
Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
### What changes were proposed in this pull request?
This is a followup of https://github.com/apache/spark/pull/32622 to fix a test case.
### Why are the changes needed?
Fix a wrong test case name and fix the test case to cause the expected error correctly.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass the CIs.
Closes#32623 from dongjoon-hyun/SPARK-34558.
Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
This is a followup of https://github.com/apache/spark/pull/31671https://github.com/apache/spark/pull/31671 qualifies the warehouse at the beginning, which may fail Spark startup if something goes wrong, like the underlying FileSystem can't be initialized.
This PR falls back to the old behavior and leave the warehouse path unqualified if qualifying fails.
### Why are the changes needed?
Fix a regression. It's important to be always able to start Spark app (e.g. spark-shell), so that we can debug.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
a new test case
Closes#32622 from cloud-fan/follow2.
Lead-authored-by: Wenchen Fan <wenchen@databricks.com>
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
Change the type of `DATASET_ID_TAG` from `Long` to `HashSet[Long]` to allow the logical plan to match multiple datasets.
### Why are the changes needed?
During the transformation from one Dataset to another Dataset, the DATASET_ID_TAG of logical plan won't change if the plan itself doesn't change:
b5241c97b1/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala (L234-L237)
However, dataset id always changes even if the logical plan doesn't change:
b5241c97b1/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala (L207-L208)
And this can lead to the mismatch between dataset's id and col's __dataset_id. E.g.,
```scala
test("SPARK-28344: fail ambiguous self join - Dataset.colRegex as column ref") {
// The test can fail if we change it to:
// val df1 = spark.range(3).toDF()
// val df2 = df1.filter($"id" > 0).toDF()
val df1 = spark.range(3)
val df2 = df1.filter($"id" > 0)
withSQLConf(
SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED.key -> "true",
SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
assertAmbiguousSelfJoin(df1.join(df2, df1.colRegex("id") > df2.colRegex("id")))
}
}
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added unit tests.
Closes#32616 from Ngone51/fix-ambiguous-join.
Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
### Why are the changes needed?
Fix scala compile error.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass GA
Closes#32617 from ulysses-you/scala2-13.
Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Add tests to check the `EXCEPTION` rebase mode explicitly in the datasources:
- Parquet: `DATE` type and `TIMESTAMP`: `INT96`, `TIMESTAMP_MICROS`, `TIMESTAMP_MILLIS`
- Avro: `DATE` type and `TIMESTAMP`: `timestamp-millis` and `timestamp-micros`.
### Why are the changes needed?
1. To improve test coverage
2. The `EXCEPTION` rebase mode should be checked independently from the default settings.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
By running the affected test suites:
```
$ build/sbt "test:testOnly *AvroV2Suite"
$ build/sbt "test:testOnly *ParquetRebaseDatetimeV1Suite"
```
Closes#32574 from MaxGekk/test-rebase-exception.
Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This PR group exception messages in `sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst`.
### Why are the changes needed?
It will largely help with standardization of error messages and its maintenance.
### Does this PR introduce _any_ user-facing change?
No. Error messages remain unchanged.
### How was this patch tested?
No new tests - pass all original tests to make sure it doesn't break any existing behavior.
Closes#32478 from beliefer/SPARK-35063.
Lead-authored-by: gengjiaan <gengjiaan@360.cn>
Co-authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This PR proposes to format strings correctly for `PushedFilters`. For example, `explain()` for a query below prints `v in (array('a'))` as `PushedFilters: [In(v, [WrappedArray(a)])]`;
```
scala> sql("create table t (v array<string>) using parquet")
scala> sql("select * from t where v in (array('a'), null)").explain()
== Physical Plan ==
*(1) Filter v#4 IN ([a],null)
+- FileScan parquet default.t[v#4] Batched: false, DataFilters: [v#4 IN ([a],null)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/maropu/Repositories/spark/spark-3.1.1-bin-hadoop2.7/spark-warehouse/t], PartitionFilters: [], PushedFilters: [In(v, [WrappedArray(a),null])], ReadSchema: struct<v:array<string>>
```
This PR makes `explain()` print it as `PushedFilters: [In(v, [[a]])]`;
```
scala> sql("select * from t where v in (array('a'), null)").explain()
== Physical Plan ==
*(1) Filter v#4 IN ([a],null)
+- FileScan parquet default.t[v#4] Batched: false, DataFilters: [v#4 IN ([a],null)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/maropu/Repositories/spark/spark-3.1.1-bin-hadoop2.7/spark-warehouse/t], PartitionFilters: [], PushedFilters: [In(v, [[a],null])], ReadSchema: struct<v:array<string>>
```
NOTE: This PR includes a bugfix caused by #32577 (See the cloud-fan comment: https://github.com/apache/spark/pull/32577/files#r636108150).
### Why are the changes needed?
To improve explain strings.
### Does this PR introduce _any_ user-facing change?
Yes, this PR improves the explain strings for pushed-down filters.
### How was this patch tested?
Added tests in `SQLQueryTestSuite`.
Closes#32615 from maropu/ExplainPartitionFilters.
Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This PR reduces the execution time of `DeduplicateRelations` by:
1) use `Set` instead `Seq` to check duplicate relations
2) avoid plan output traverse and attribute rewrites when there are no changes in the children plan
### Why are the changes needed?
Rule `DeduplicateRelations` is slow.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Run `TPCDSQuerySuite` and checked the run time of `DeduplicateRelations`. The time has been reduced by 77.9% after this PR.
Closes#32590 from Ngone51/improve-dedup.
Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Gengliang Wang <ltnwgl@gmail.com>
### What changes were proposed in this pull request?
Print the invalid value in config validation error message for `checkValue` just like `checkValues`
### Why are the changes needed?
Invalid configuration values may come in many ways, this PR can help different kinds of users or developers to identify what the config the error is related to
### Does this PR introduce _any_ user-facing change?
yes, but only error msg
### How was this patch tested?
yes, modified tests
Closes#32600 from yaooqinn/SPARK-35456.
Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
CTAS with location clause acts as an insert overwrite. This can cause problems when there are subdirectories within a location directory.
This causes some users to accidentally wipe out directories with very important data. We should not allow CTAS with location to a non-empty directory.
### Why are the changes needed?
Hive already handled this scenario: HIVE-11319
Steps to reproduce:
```scala
sql("""create external table `demo_CTAS`( `comment` string) PARTITIONED BY (`col1` string, `col2` string) STORED AS parquet location '/tmp/u1/demo_CTAS'""")
sql("""INSERT OVERWRITE TABLE demo_CTAS partition (col1='1',col2='1') VALUES ('abc')""")
sql("select* from demo_CTAS").show
sql("""create table ctas1 location '/tmp/u2/ctas1' as select * from demo_CTAS""")
sql("select* from ctas1").show
sql("""create table ctas2 location '/tmp/u2' as select * from demo_CTAS""")
```
Before the fix: Both create table operations will succeed. But values in table ctas1 will be replaced by ctas2 accidentally.
After the fix: `create table ctas2...` will throw `AnalysisException`:
```
org.apache.spark.sql.AnalysisException: CREATE-TABLE-AS-SELECT cannot create table with location to a non-empty directory /tmp/u2 . To allow overwriting the existing non-empty directory, set 'spark.sql.legacy.allowNonEmptyLocationInCTAS' to true.
```
### Does this PR introduce _any_ user-facing change?
Yes, if the location directory is not empty, CTAS with location will throw AnalysisException
```
sql("""create table ctas2 location '/tmp/u2' as select * from demo_CTAS""")
```
```
org.apache.spark.sql.AnalysisException: CREATE-TABLE-AS-SELECT cannot create table with location to a non-empty directory /tmp/u2 . To allow overwriting the existing non-empty directory, set 'spark.sql.legacy.allowNonEmptyLocationInCTAS' to true.
```
`CREATE TABLE AS SELECT` with non-empty `LOCATION` will throw `AnalysisException`. To restore the behavior before Spark 3.2, need to set `spark.sql.legacy.allowNonEmptyLocationInCTAS` to `true`. , default value is `false`.
Updated SQL migration guide.
### How was this patch tested?
Test case added in SQLQuerySuite.scala
Closes#32411 from vinodkc/br_fixCTAS_nonempty_dir.
Authored-by: Vinod KC <vinod.kc.in@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
As title. Fixed two places where the documentation for window operator has some error.
### Why are the changes needed?
Help people read code for window operator more easily in the future.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
Closes#32585 from c21/minor-doc.
Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
### What changes were proposed in this pull request?
AQE has an optimization where it attempts to reuse compatible exchanges but it does not take into account whether the exchanges are columnar or not, resulting in incorrect reuse under some circumstances.
This PR simply changes the key used to lookup cached stages. It now uses the canonicalized form of the new query stage (potentially created by a plugin) rather than using the canonicalized form of the original exchange.
### Why are the changes needed?
When using the [RAPIDS Accelerator for Apache Spark](https://github.com/NVIDIA/spark-rapids) we sometimes see a new query stage correctly create a row-based exchange and then Spark replaces it with a cached columnar exchange, which is not compatible, and this causes queries to fail.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
The patch has been tested with the query that highlighted this issue. I looked at writing unit tests for this but it would involve implementing a mock columnar exchange in the tests so would be quite a bit of work. If anyone has ideas on other ways to test this I am happy to hear them.
Closes#32195 from andygrove/SPARK-35093.
Authored-by: Andy Grove <andygrove73@gmail.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
### What changes were proposed in this pull request?
Updating column stats for Union operator stats estimation
### Why are the changes needed?
This is a followup PR to update the null count also in the Union stats operator estimation. https://github.com/apache/spark/pull/30334
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Updated UTs, manual testing
Closes#32494 from shahidki31/shahid/updateNullCountForUnion.
Lead-authored-by: shahid <shahidki31@gmail.com>
Co-authored-by: Shahid <shahidki31@gmail.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
### What changes were proposed in this pull request?
This PR adds `sentences`, a string function, which is present as of `2.0.0` but missing in `functions.{scala,py}`.
### Why are the changes needed?
This function can be only used from SQL for now.
It's good if we can use this function from Scala/Python code as well as SQL.
### Does this PR introduce _any_ user-facing change?
Yes. Users can use this function from Scala and Python.
### How was this patch tested?
New test.
Closes#32566 from sarutak/sentences-function.
Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Kousuke Saruta <sarutak@oss.nttdata.com>
### What changes were proposed in this pull request?
Update histogram statistics for RANGE operator stats estimation
### Why are the changes needed?
If histogram optimization is enabled, this statistics can be used in various cost based optimizations.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added UTs. Manual test.
Closes#32498 from shahidki31/shahid/histogram.
Lead-authored-by: shahid <shahidki31@gmail.com>
Co-authored-by: Shahid <shahidki31@gmail.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
### What changes were proposed in this pull request?
1. In HadoopMapReduceCommitProtocol, create parent directory before renaming custom partition path staging files
2. In InMemoryCatalog and HiveExternalCatalog, create new partition directory before renaming old partition path
3. Check return value of FileSystem#rename, if false, throw exception to avoid silent data loss cause by rename failure
4. Change DebugFilesystem#rename behavior to make it match HDFS's behavior (return false without rename when dst parent directory not exist)
### Why are the changes needed?
Depends on FileSystem#rename implementation, when destination directory does not exist, file system may
1. return false without renaming file nor throwing exception (e.g. HDFS), or
2. create destination directory, rename files, and return true (e.g. LocalFileSystem)
In the first case above, renames in HadoopMapReduceCommitProtocol for custom partition path will fail silently if the destination partition path does not exist. Failed renames can happen when
1. dynamicPartitionOverwrite == true, the custom partition path directories are deleted by the job before the rename; or
2. the custom partition path directories do not exist before the job; or
3. something else is wrong when file system handle `rename`
The renames in MemoryCatalog and HiveExternalCatalog for partition renaming also have similar issue.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Modified DebugFilesystem#rename, and added new unit tests.
Without the fix in src code, five InsertSuite tests and one AlterTableRenamePartitionSuite test failed:
InsertSuite.SPARK-20236: dynamic partition overwrite with custom partition path (existing test with modified FS)
```
== Results ==
!== Correct Answer - 1 == == Spark Answer - 0 ==
struct<> struct<>
![2,1,1]
```
InsertSuite.SPARK-35106: insert overwrite with custom partition path
```
== Results ==
!== Correct Answer - 1 == == Spark Answer - 0 ==
struct<> struct<>
![2,1,1]
```
InsertSuite.SPARK-35106: dynamic partition overwrite with custom partition path
```
== Results ==
!== Correct Answer - 2 == == Spark Answer - 1 ==
!struct<> struct<i:int,part1:int,part2:int>
[1,1,1] [1,1,1]
![1,1,2]
```
InsertSuite.SPARK-35106: Throw exception when rename custom partition paths returns false
```
Expected exception org.apache.spark.SparkException to be thrown, but no exception was thrown
```
InsertSuite.SPARK-35106: Throw exception when rename dynamic partition paths returns false
```
Expected exception org.apache.spark.SparkException to be thrown, but no exception was thrown
```
AlterTableRenamePartitionSuite.ALTER TABLE .. RENAME PARTITION V1: multi part partition (existing test with modified FS)
```
== Results ==
!== Correct Answer - 1 == == Spark Answer - 0 ==
struct<> struct<>
![3,123,3]
```
Closes#32530 from YuzhouSun/SPARK-35106.
Authored-by: Yuzhou Sun <yuzhosun@amazon.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This PR fixes an issue that streaming queries with V2Relation can have redundant `ProjectExec` in its physical plan.
You can easily reproduce this issue with the following code.
```
import org.apache.spark.sql.streaming.Trigger
val query = spark.
readStream.
format("rate").
option("rowsPerSecond", 1000).
option("rampUpTime", "10s").
load().
selectExpr("timestamp", "100", "value").
writeStream.
format("console").
trigger(Trigger.ProcessingTime("5 seconds")).
// trigger(Trigger.Continuous("5 seconds")). // You can reproduce with continuous processing too.
outputMode("append").
start()
```
The plan tree is here.
![ss-before](https://user-images.githubusercontent.com/4736016/118454996-ec439800-b733-11eb-8cd8-ed8af73a91b8.png)
### Why are the changes needed?
For better performance.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
I run the same code above and get the following plan tree.
![ss-after](https://user-images.githubusercontent.com/4736016/118455755-1bf2a000-b734-11eb-999e-4b8c19ad34d7.png)
Closes#32570 from sarutak/fix-redundant-projectexec.
Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
SPARK-35253 upgraded janino from 3.0.16 to 3.1.4, `ClassBodyEvaluator` provides the `getBytecodes` method to get
the mapping from `ClassFile#getThisClassName` to `ClassFile#toByteArray` directly in this version and we don't need to get this variable by reflection api anymore.
So the main purpose of this pr is simplify the way to get `bytecodes` from `ClassBodyEvaluator` in `CodeGenerator#updateAndGetCompilationStats` method.
### Why are the changes needed?
Code simplification.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
- Pass the Jenkins or GitHub Action
- Manual test:
1. Define a code fragment to be tested, for example:
```
val codeBody = s"""
public java.lang.Object generate(Object[] references) {
return new TestMetricCode(references);
}
class TestMetricCode {
public TestMetricCode(Object[] references) {
}
public long sumOfSquares(long left, long right) {
return left * left + right * right;
}
}
"""
```
2. Create a `ClassBodyEvaluator` and `cook` the `codeBody` as above, the process of creating `ClassBodyEvaluator` can extract from `CodeGenerator#doCompile` method.
3. Get `bytecodes` using `ClassBodyEvaluator#getBytecodes` api(after this pr) and reflection api(before this pr) respectively, then assert that they are the same. If the `bytecodes` not changed, we can be sure that metrics state will not change. The test code example as follows:
```
import scala.collection.JavaConverters._
val bytecodesFromApi = evaluator.getBytecodes.asScala
val bytecodesFromReflectionApi = {
val scField = classOf[ClassBodyEvaluator].getDeclaredField("sc")
scField.setAccessible(true)
val compiler = scField.get(evaluator).asInstanceOf[SimpleCompiler]
val loader = compiler.getClassLoader.asInstanceOf[ByteArrayClassLoader]
val classesField = loader.getClass.getDeclaredField("classes")
classesField.setAccessible(true)
classesField.get(loader).asInstanceOf[java.util.Map[String, Array[Byte]]].asScala
}
assert(bytecodesFromApi == bytecodesFromReflectionApi)
```
Closes#32536 from LuciferYang/SPARK-35253-FOLLOWUP.
Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
### What changes were proposed in this pull request?
Write out Seq of product objects which contain TreeNode, to avoid the cases as described in https://issues.apache.org/jira/browse/SPARK-35411 that essential information will be ignored and just written out as null values. These information are necessary to understand the query plans.
### Why are the changes needed?
Information like cteRelations in With node, and branches in CaseWhen expression are necessary to understand the query plans, they should be written out to the result json string.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT case added.
Closes#32557 from ivoson/plan-json-fix.
Authored-by: Tengfei Huang <tengfei.h@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
To pass the TPCDS-related plan stability tests in scala-2.13, this PR proposes to fix two things below;
- (1) Sorts elements in the predicate `InSet` and the source filter `In` for printing their nodes.
- (2) Formats nested collection elements (`Seq`, `Array`, and `Set`) recursively in `TreeNode.argString`.
As for (1), it seems v2.12/v2.13 prints `Set` elements with a different order, so we need to sort them explicitly. As for (2), the `Seq` implementation is different between v2.12/v2.13, so we need to format nested `Seq` elements correctly to hide the name of its implementation (See an example below);
```
(74) Expand [codegen id : 20]
Input [5]: [sales#41, RETURNS#42, profit#43, channel#44, id#45]
-Arguments: [ArrayBuffer(sales#41, returns#42, ... <-- scala-2.12
+Arguments: [Vector(sales#41, returns#42, ... <-- scala-2.13
+Arguments: [[(sales#41, returns#42, ... <-- the proposed fix to hide the name of its implementation
```
### Why are the changes needed?
To pass the tests in Scala v2.13.
### Does this PR introduce _any_ user-facing change?
Yes, this fix changes query explain strings.
### How was this patch tested?
Manually checked.
Closes#32577 from maropu/FixTPCDSTestIssueInScala213.
Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
When creating `Invoke` and `StaticInvoke` for `ScalarFunction`'s magic method, set `propagateNull` to false.
### Why are the changes needed?
When `propgagateNull` is true (which is the default value), `Invoke` and `StaticInvoke` will return null if any of the argument is null. For scalar function this is incorrect, as we should leave the logic to function implementation instead.
### Does this PR introduce _any_ user-facing change?
Yes. Now null arguments shall be properly handled with magic method.
### How was this patch tested?
Added new tests.
Closes#32553 from sunchao/SPARK-35389.
Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
To pass `subquery/scalar-subquery/scalar-subquery-select.sql` (`SQLQueryTestSuite`) in Scala v2.13, this PR proposes to change the aggregate expr of a test query in the file from `collect_set(...)` to `sort_array(collect_set(...))` because `collect_set` depends on the `mutable.HashSet` implementation and elements in the set are printed in a different order in Scala v2.12/v2.13.
### Why are the changes needed?
To pass the test in Scala v2.13.
### Does this PR introduce _any_ user-facing change?
No, dev-only.
### How was this patch tested?
Manually checked.
Closes#32578 from maropu/FixSQLTestIssueInScala213.
Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
https://github.com/apache/spark/pull/30309 added a configuration (disabled by default) that simplifies the error messages from Python UDFS, which removed internal stacktrace from Python workers:
```python
from pyspark.sql.functions import udf; spark.range(10).select(udf(lambda x: x/0)("id")).collect()
```
**Before**
```
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/.../python/pyspark/sql/dataframe.py", line 427, in show
print(self._jdf.showString(n, 20, vertical))
File "/.../python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
File "/.../python/pyspark/sql/utils.py", line 127, in deco
raise_from(converted)
File "<string>", line 3, in raise_from
pyspark.sql.utils.PythonException:
An exception was thrown from Python worker in the executor:
Traceback (most recent call last):
File "/.../python/lib/pyspark.zip/pyspark/worker.py", line 605, in main
process()
File "/.../python/lib/pyspark.zip/pyspark/worker.py", line 597, in process
serializer.dump_stream(out_iter, outfile)
File "/.../python/lib/pyspark.zip/pyspark/serializers.py", line 223, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/.../python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
for obj in iterator:
File "/.../python/lib/pyspark.zip/pyspark/serializers.py", line 212, in _batched
for item in iterator:
File "/.../python/lib/pyspark.zip/pyspark/worker.py", line 450, in mapper
result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
File "/.../python/lib/pyspark.zip/pyspark/worker.py", line 450, in <genexpr>
result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
File "/.../python/lib/pyspark.zip/pyspark/worker.py", line 90, in <lambda>
return lambda *a: f(*a)
File "/.../python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper
return f(*args, **kwargs)
File "<stdin>", line 1, in <lambda>
ZeroDivisionError: division by zero
```
**After**
```
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/.../python/pyspark/sql/dataframe.py", line 427, in show
print(self._jdf.showString(n, 20, vertical))
File "/.../python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
File "/.../python/pyspark/sql/utils.py", line 127, in deco
raise_from(converted)
File "<string>", line 3, in raise_from
pyspark.sql.utils.PythonException:
An exception was thrown from Python worker in the executor:
Traceback (most recent call last):
File "<stdin>", line 1, in <lambda>
ZeroDivisionError: division by zero
```
Note that the traceback (`return f(*args, **kwargs)`) is almost always same - I would say more than 99%. For 1% case, we can guide developers to enable this configuration for further debugging.
In Databricks, it has been enabled for around 6 months, and I have had zero negative feedback on it.
### Why are the changes needed?
To show simplified exception messages to end users.
### Does this PR introduce _any_ user-facing change?
Yes, it will hide the internal Python worker traceback.
### How was this patch tested?
Existing test cases should cover.
Closes#32569 from HyukjinKwon/SPARK-35419.
Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Fix test failure under Scala 2.13 by making test `ScalaFunction` `StrLenMagic` public.
### Why are the changes needed?
A few tests are failing when using Scala 2.13 with error message like the following:
```
[info] Cause: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 35, Column 121: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 35, Column 121: No a
pplicable constructor/method found for actual parameters "org.apache.spark.unsafe.types.UTF8String"; candidates are: "public int org.apache.spark.sql.connector.DataSourceV2FunctionSuite$StrLenMagic$.invoke(org.apache.spark.
unsafe.types.UTF8String)"
[info] at org.apache.spark.sql.errors.QueryExecutionErrors$.compilerError(QueryExecutionErrors.scala:387)
[info] at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:1415)
[info] at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1501)
```
This seems to be caused by the fact that the `StrLenMagic` is using `private` scope. After removing the `private` keyword the tests are now passing.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
```
$ dev/change-scala-version.sh 2.13
$ build/sbt "sql/testOnly *.DataSourceV2FunctionSuite" -Pscala-2.13
```
Closes#32575 from sunchao/SPARK-34981-follow-up.
Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
This PR is used to fix this bug:
```
set spark.sql.legacy.charVarcharAsString=true;
create table chartb01(a char(3));
insert into chartb01 select 'aaaaa';
```
here we expect the data of table chartb01 is 'aaa', but it runs failed.
### Why are the changes needed?
Improve backward compatibility
```
spark-sql>
> create table tchar01(col char(2)) using parquet;
Time taken: 0.767 seconds
spark-sql>
> insert into tchar01 select 'aaa';
ERROR | Executor task launch worker for task 0.0 in stage 0.0 (TID 0) | Aborting task | org.apache.spark.util.Utils.logError(Logging.scala:94)
java.lang.RuntimeException: Exceeds char/varchar type length limitation: 2
at org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils.trimTrailingSpaces(CharVarcharCodegenUtils.java:31)
at org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils.charTypeWriteSideCheck(CharVarcharCodegenUtils.java:44)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:279)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1500)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:288)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:212)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1466)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```
### Does this PR introduce _any_ user-facing change?
No (the legacy config is false by default).
### How was this patch tested?
Added unit tests.
Closes#32501 from fhygh/master.
Authored-by: fhygh <283452027@qq.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Spark doesn't support aggregate functions with mixed outer and local references. This PR applies this check earlier to fail with a clear error message instead of some weird ones, and simplifies the related code in `SubExprUtils.getOuterReferences`. This PR also refines the error message a bit.
### Why are the changes needed?
better error message
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
updated tests
Closes#32503 from cloud-fan/try.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Introduction: this PR is a part of SPARK-10816 (`EventTime based sessionization (session window)`). Please refer #31937 to see the overall view of the code change. (Note that code diff could be diverged a bit.)
### What changes were proposed in this pull request?
This PR introduces UpdatingSessionsIterator, which analyzes neighbor elements and adjust session information on elements.
UpdatingSessionsIterator calculates and updates the session window for each element in the given iterator, which makes elements in the same session window having same session spec. Downstream can apply aggregation to finally merge these elements bound to the same session window.
UpdatingSessionsIterator works on the precondition that given iterator is sorted by "group keys + start time of session window", and the iterator still retains the characteristic of the sort.
UpdatingSessionsIterator copies the elements to safely update on each element, as well as buffers elements which are bound to the same session window. Due to such overheads, MergingSessionsIterator which will be introduced via SPARK-34889 should be used whenever possible.
This PR also introduces UpdatingSessionsExec which is the physical node on leveraging UpdatingSessionsIterator to sort the input rows and updates session information on input rows.
### Why are the changes needed?
This part is a one of required on implementing SPARK-10816.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New test suite added.
Closes#31986 from HeartSaVioR/SPARK-34888-SPARK-10816-PR-31570-part-1.
Lead-authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Co-authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
### What changes were proposed in this pull request?
This fixes the compilation error due to the logical conflicts between https://github.com/apache/spark/pull/31776 and https://github.com/apache/spark/pull/29642 .
### Why are the changes needed?
To recover compilation.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Closes#32568 from wangyum/HOT-FIX.
Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>