### What changes were proposed in this pull request?
Extend DayTimeIntervalType to support interval fields. Valid interval field values:
- 0 (DAY)
- 1 (HOUR)
- 2 (MINUTE)
- 3 (SECOND)
After the changes, the following day-time interval types are supported:
1. `DayTimeIntervalType(0, 0)` or `DayTimeIntervalType(DAY, DAY)`
2. `DayTimeIntervalType(0, 1)` or `DayTimeIntervalType(DAY, HOUR)`
3. `DayTimeIntervalType(0, 2)` or `DayTimeIntervalType(DAY, MINUTE)`
4. `DayTimeIntervalType(0, 3)` or `DayTimeIntervalType(DAY, SECOND)`. **It is the default one**. The second fraction precision is microseconds.
5. `DayTimeIntervalType(1, 1)` or `DayTimeIntervalType(HOUR, HOUR)`
6. `DayTimeIntervalType(1, 2)` or `DayTimeIntervalType(HOUR, MINUTE)`
7. `DayTimeIntervalType(1, 3)` or `DayTimeIntervalType(HOUR, SECOND)`
8. `DayTimeIntervalType(2, 2)` or `DayTimeIntervalType(MINUTE, MINUTE)`
9. `DayTimeIntervalType(2, 3)` or `DayTimeIntervalType(MINUTE, SECOND)`
10. `DayTimeIntervalType(3, 3)` or `DayTimeIntervalType(SECOND, SECOND)`
### Why are the changes needed?
In the current implementation, Spark supports only `interval day to second` but the SQL standard allows to specify the start and end fields. The changes will allow to follow ANSI SQL standard more precisely.
### Does this PR introduce _any_ user-facing change?
Yes but `DayTimeIntervalType` has not been released yet.
### How was this patch tested?
By existing test suites.
Closes#32849 from MaxGekk/day-time-interval-type-units.
Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
### What changes were proposed in this pull request?
Collect observed metrics from cached and adaptive execution sub-trees.
### Why are the changes needed?
Currently persisting/caching will hide all observed metrics in that sub-tree from reaching the `QueryExecutionListeners`. Adaptive query execution can also hide the metrics from reaching `QueryExecutionListeners`.
### Does this PR introduce _any_ user-facing change?
Bugfix
### How was this patch tested?
New UTs
Closes#32862 from tanelk/SPARK-35695_collect_metrics_persist.
Lead-authored-by: Tanel Kiis <tanel.kiis@gmail.com>
Co-authored-by: tanel.kiis@gmail.com <tanel.kiis@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
It seems like spark inner join is performing a cartesian join in self joining using `joinWith`
To produce this issues:
```
val df = spark.range(0,3)
df.joinWith(df, df("id") === df("id")).show()
```
Before this pull request, the result is
+---+---+
| _1 | _2 |
+---+---+
| 0 | 0 |
| 0 | 1 |
| 0 | 2 |
| 1 | 0 |
| 1 | 1 |
| 1 | 2 |
| 2 | 0 |
| 2 | 1 |
| 2 | 2 |
+---+---+
The expected result is
+---+---+
| _1 | _2 |
+---+---+
| 0 | 0 |
| 1 | 1 |
| 2 | 2 |
+---+---+
### Why are the changes needed?
correctness
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
add test
Closes#32863 from dgd-contributor/SPARK-35652_join_and_joinWith_in_seft_joining.
Authored-by: dgd-contributor <dgd_contributor@viettel.com.vn>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
1. Remove duplicated code in the form of `readXXX` in `VectorizedRleValuesReader`. For instance:
```java
public void readIntegers(
int total,
WritableColumnVector c,
int rowId,
int level,
VectorizedValuesReader data) throws IOException {
int left = total;
while (left > 0) {
if (this.currentCount == 0) this.readNextGroup();
int n = Math.min(left, this.currentCount);
switch (mode) {
case RLE:
if (currentValue == level) {
data.readIntegers(n, c, rowId);
} else {
c.putNulls(rowId, n);
}
break;
case PACKED:
for (int i = 0; i < n; ++i) {
if (currentBuffer[currentBufferIdx++] == level) {
c.putInt(rowId + i, data.readInteger());
} else {
c.putNull(rowId + i);
}
}
break;
}
rowId += n;
left -= n;
currentCount -= n;
}
}
```
and replace with:
```java
public void readBatch(
int total,
int offset,
WritableColumnVector values,
int maxDefinitionLevel,
VectorizedValuesReader valueReader,
ParquetVectorUpdater updater) throws IOException {
int left = total;
while (left > 0) {
if (this.currentCount == 0) this.readNextGroup();
int n = Math.min(left, this.currentCount);
switch (mode) {
case RLE:
if (currentValue == maxDefinitionLevel) {
updater.updateBatch(n, offset, values, valueReader);
} else {
values.putNulls(offset, n);
}
break;
case PACKED:
for (int i = 0; i < n; ++i) {
if (currentBuffer[currentBufferIdx++] == maxDefinitionLevel) {
updater.update(offset + i, values, valueReader);
} else {
values.putNull(offset + i);
}
}
break;
}
offset += n;
left -= n;
currentCount -= n;
}
}
```
where the `ParquetVectorUpdater` is type specific, and has different implementations under `updateBatch` and `update`. Together, this also changes code paths handling timestamp types to use the batch read API for decoding definition levels.
2. Similar to the above, this removes code duplication in `VectorizedColumnReader.decodeDictionaryIds`. Now different implementations are under `ParquetVectorUpdater.decodeSingleDictionaryId`.
### Why are the changes needed?
`VectorizedRleValuesReader` and `VectorizedColumnReader` are becoming increasingly harder to maintain, as any change touches the above logic **will need to be replicated in 20+ places**. The issue becomes even more serious when we are going to implement column index (for instance, see how the change [here](https://github.com/apache/spark/pull/32753/files#diff-a01e174e178366aadf07f64ee690d47d343b2ca416a4a2b2ea735887c22d5934R191) has to be replicated multiple times) and complex type support (in progress) for the vectorized path.
In addition, currently dictionary decoding (see `VectorizedColumnReader.decodeDictionaryIds`) and non-dictionary decoding are handled separately, and therefore the same (very complicated) branching logic based on input Spark & Parquet types have to be replicated in two places, which is another burden for code maintenance.
The original intention is for performance. However these days JIT compilers tend to be very effective on this and will inline virtual calls aggressively to eliminate the method invocation costs (see [this](https://shipilev.net/blog/2015/black-magic-method-dispatch/) and [this](http://insightfullogic.com/blog/2014/may/12/fast-and-megamorphic-what-influences-method-invoca/)). I've also done benchmarks using a modified `DataSourceReadBenchmark` and `DateTimeRebaseBenchmark` and the result is almost exact the same before and after the change. The results can be found [here](https://gist.github.com/sunchao/674afbf942ccc2370bdcfa33efb4471c), and [here's](https://github.com/sunchao/spark/tree/parquet-refactor) the source code.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing tests.
Closes#32777 from sunchao/SPARK-35640.
Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
### What changes were proposed in this pull request?
This pr upgrades built-in Hive to 2.3.9. Hive 2.3.9 changes:
- [HIVE-17155] - findConfFile() in HiveConf.java has some issues with the conf path
- [HIVE-24797] - Disable validate default values when parsing Avro schemas
- [HIVE-24608] - Switch back to get_table in HMS client for Hive 2.3.x
- [HIVE-21200] - Vectorization: date column throwing java.lang.UnsupportedOperationException for parquet
- [HIVE-21563] - Improve Table#getEmptyTable performance by disabling registerAllFunctionsOnce
- [HIVE-19228] - Remove commons-httpclient 3.x usage
### Why are the changes needed?
Fix regression caused by AVRO-2035.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit test.
Closes#32750 from wangyum/SPARK-34512.
Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
This PR fixes an issue that `Dataset.observe` doesn't work if `CollectMetricsExec` in a task handles multiple partitions.
If `coalesce` follows `observe` and the number of partitions shrinks after `coalesce`, `CollectMetricsExec` can handle multiple partitions in a task.
### Why are the changes needed?
The current implementation of `CollectMetricsExec` doesn't consider the case it can handle multiple partitions.
Because new `updater` is created for each partition even though those partitions belong to the same task, `collector.setState(updater)` raise an assertion error.
This is a simple reproducible example.
```
$ bin/spark-shell --master "local[1]"
scala> spark.range(1, 4, 1, 3).observe("my_event", count($"id").as("count_val")).coalesce(2).collect
```
```
java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:208)
at org.apache.spark.sql.execution.AggregatingAccumulator.setState(AggregatingAccumulator.scala:204)
at org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2(CollectMetricsExec.scala:72)
at org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2$adapted(CollectMetricsExec.scala:71)
at org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:125)
at org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1(TaskContextImpl.scala:124)
at org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1$adapted(TaskContextImpl.scala:124)
at org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1(TaskContextImpl.scala:137)
at org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1$adapted(TaskContextImpl.scala:135)
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New test.
Closes#32786 from sarutak/fix-collectmetricsexec.
Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Use `UnresolvedHint.resolved = child.resolved` instead `UnresolvedHint.resolved = false`, then the plan contains `UnresolvedHint` child can be optimized by rule in batch `Resolution`.
For instance, before this pr, the following plan can't be optimized by `ResolveReferences`.
```
!'Project [*]
+- SubqueryAlias __auto_generated_subquery_name
+- UnresolvedHint use_hash
+- Project [42 AS 42#10]
+- OneRowRelation
```
### Why are the changes needed?
fix hint in subquery bug
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New test.
Closes#32841 from cfmcgrady/SPARK-35673.
Authored-by: Fu Chen <cfmcgrady@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Add `PartitioningCollection` in EnsureRequirements during remove shuffle.
### Why are the changes needed?
Currently `EnsureRequirements` only check if child has semantic equal `HashPartitioning` and remove
redundant shuffle. We can enhance this case using `PartitioningCollection`.
### Does this PR introduce _any_ user-facing change?
Yes, plan might be changed.
### How was this patch tested?
Add test.
Closes#32815 from ulysses-you/shuffle-node.
Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Kent Yao <yao@apache.org>
### What changes were proposed in this pull request?
This PR adds support for lateral subqueries. A lateral subquery is a subquery preceded by the `LATERAL` keyword in the FROM clause of a query that can reference columns in the preceding FROM items. For example:
```sql
SELECT * FROM t1, LATERAL (SELECT * FROM t2 WHERE t1.a = t2.c)
```
A new subquery expression`LateralSubquery` is used to represent a lateral subquery. It is similar to `ScalarSubquery` but can return multiple rows and columns. A new logical unary node `LateralJoin` is used to represent a lateral join.
Here is the analyzed plan for the above query:
```scala
Project [a, b, c, d]
+- LateralJoin lateral-subquery [a], Inner
: +- Project [c, d]
: +- Filter (outer(a) = c)
: +- Relation [c, d]
+- Relation [a, b]
```
Similar to a correlated subquery, a lateral subquery can be viewed as a dependent (nested loop) join where the evaluation of the right subtree depends on the current value of the left subtree. The same technique to decorrelate a subquery is used to decorrelate a lateral join:
```scala
Project [a, b, c, d]
+- LateralJoin lateral-subquery [a && a = c], Inner // pull up correlated predicates as join conditions
: +- Project [c, d]
: +- Relation [c, d]
+- Relation [a, b]
```
Then the lateral join can be rewritten into a normal join:
```scala
Join Inner (a = c)
:- Relation [a, b]
+- Relation [c, d]
```
#### Follow-ups:
1. Similar to rewriting correlated scalar subqueries, rewriting lateral joins is also subject to the COUNT bug (See SPARK-15370 for more details). This is **not** handled in the current PR as it requires a sizeable amount of refactoring. It will be addressed in a subsequent PR (SPARK-35551).
2. Currently Spark does use outer query references to resolve star expressions in subqueries. This is not lateral subquery specific and can be handled in a separate PR (SPARK-35618)
### Why are the changes needed?
To support an ANSI SQL feature.
### Does this PR introduce _any_ user-facing change?
Yes. It allows users to use lateral subqueries in the FROM clause of a query.
### How was this patch tested?
- Parser test: `PlanParserSuite.scala`
- Analyzer test: `ResolveSubquerySuite.scala`
- Optimizer test: `PullupCorrelatedPredicatesSuite.scala`
- SQL test: `join-lateral.sql`, `postgreSQL/join.sql`
Closes#32303 from allisonwang-db/spark-34382-lateral.
Lead-authored-by: allisonwang-db <66282705+allisonwang-db@users.noreply.github.com>
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
The changed [unit test](https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala#L566) was introduce in https://github.com/apache/spark/pull/21587, to fix the planner side of thing for stream-stream join. Ideally check the query result should catch the bug, but it would be better to add plan check to make the purpose of unit test more clearly and catch future bug from planner change.
### Why are the changes needed?
Improve unit test.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Changed test itself.
Closes#32836 from c21/ss-test.
Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Handle type coercion when resolving V2 function. In particular:
- prior to evaluating function arguments, insert cast whenever the argument type doesn't match the expected input type.
- use `BoundFunction.inputTypes()` to lookup magic method for scalar function
### Why are the changes needed?
For V2 functions, the actual argument types should not necessarily match those of the input types, and Spark should handle type coercion whenever it is needed.
### Does this PR introduce _any_ user-facing change?
Yes. Now V2 function resolution should be able to handle type coercion properly.
### How was this patch tested?
Added a few new tests.
Closes#32764 from sunchao/SPARK-35390.
Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This PR enhances `RepartitionByExpression` to make it coalesce partitions efficiently by AQE. Usually used to merge small files.
The basic logic is: Spark first tries to coalesce partitions, if it cannot be coalesced, then use the local shuffle reader to read data to avoid exchange the data over the network.
Usage:
```sql
SELECT /*+ REPARTITION */ * FROM t
```
```scala
df.repartition()
```
For example:
coalesce small output files | local shuffle reader
--- | ---
![image](https://user-images.githubusercontent.com/5399861/120772533-fc8cad00-c552-11eb-977e-5bb61b84cbe2.png)| ![image](https://user-images.githubusercontent.com/5399861/120772324-c6e7c400-c552-11eb-9daa-f6b5021fd1b9.png)
### Why are the changes needed?
Coalesce partitions efficiently.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit test.
Closes#32781 from wangyum/SPARK-35650.
Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Write tests for timestamp without time zone in UDF as input parameters and results.
### Why are the changes needed?
It follows https://github.com/apache/spark/pull/31779 to improve test coverage.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit test
Closes#32840 from gengliangwang/tswtzUDF.
Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
### What changes were proposed in this pull request?
In the PR, I propose to extend Spark SQL API to accept `java.time.LocalDateTime` as an external type of recently added new Catalyst type - `TimestampWithoutTZ`. The Java class `java.time.LocalDateTime` has a similar semantic to ANSI SQL timestamp without timezone type, and it is the most suitable to be an external type for `TimestampWithoutTZType`. In more details:
* Added `TimestampWithoutTZConverter` which converts java.time.LocalDateTime instances to/from internal representation of the Catalyst type `TimestampWithoutTZType` (to Long type). The `TimestampWithoutTZConverter` object uses new methods of DateTimeUtils:
* localDateTimeToMicros() converts the input date time to the total length in microseconds.
* microsToLocalDateTime() obtains a java.time.LocalDateTime
* Support new type `TimestampWithoutTZType` in RowEncoder via the methods createDeserializerForLocalDateTime() and createSerializerForLocalDateTime().
* Extended the Literal API to construct literals from `java.time.LocalDateTime` instances.
### Why are the changes needed?
To allow users parallelization of `java.time.LocalDateTime` collections, and construct timestamp without time zone columns. Also to collect such columns back to the driver side.
### Does this PR introduce _any_ user-facing change?
The PR extends existing functionality. So, users can parallelize instances of the java.time.LocalDateTime class and collect them back.
```
scala> val ds = Seq(java.time.LocalDateTime.parse("1970-01-01T00:00:00")).toDS
ds: org.apache.spark.sql.Dataset[java.time.LocalDateTime] = [value: timestampwithouttz]
scala> ds.collect()
res0: Array[java.time.LocalDateTime] = Array(1970-01-01T00:00)
```
### How was this patch tested?
New unit tests
Closes#32814 from gengliangwang/LocalDateTime.
Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
### What changes were proposed in this pull request?
Move `assume` into methods at `PythonUDFSuite`.
### Why are the changes needed?
When we run Spark test with such command:
`./build/mvn -Phadoop-2.7 -Phive -Phive-thriftserver -Pyarn -Pkubernetes clean test`
get this exception:
```
PythonUDFSuite:
org.apache.spark.sql.execution.python.PythonUDFSuite *** ABORTED ***
java.lang.RuntimeException: Unable to load a Suite class that was discovered in the runpath: org.apache.spark.sql.execution.python.PythonUDFSuite
at org.scalatest.tools.DiscoverySuite$.getSuiteInstance(DiscoverySuite.scala:81)
at org.scalatest.tools.DiscoverySuite.$anonfun$nestedSuites$1(DiscoverySuite.scala:38)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
```
The test env has no PYSpark module so it failed.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
manual
Closes#32833 from ulysses-you/SPARK-35687.
Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
The implementation for the save operation of RocksDBFileManager.
### Why are the changes needed?
Save all the files in the given local checkpoint directory as a committed version in DFS.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New UT added.
Closes#32582 from xuanyuanking/SPARK-35436.
Authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
### What changes were proposed in this pull request?
Currently, Spark eagerly executes commands on the caller side of `QueryExecution`, which is a bit hacky as `QueryExecution` is not aware of it and leads to confusion.
For example, if you run `sql("show tables").collect()`, you will see two queries with identical query plans in the web UI.
![image](https://user-images.githubusercontent.com/3182036/121193729-a72d0480-c8a0-11eb-8b12-379019607ad5.png)
![image](https://user-images.githubusercontent.com/3182036/121193822-bc099800-c8a0-11eb-9d2a-34ab1329e2f7.png)
![image](https://user-images.githubusercontent.com/3182036/121193845-c0ce4c00-c8a0-11eb-96d0-ef604a4dfab0.png)
The first query is triggered at `Dataset.logicalPlan`, which eagerly executes the command.
The second query is triggered at `Dataset.collect`, which is the normal query execution.
From the web UI, it's hard to tell that these two queries are caused by eager command execution.
This PR proposes to move the eager command execution to `QueryExecution`, and turn the command plan to `CommandResult` to indicate that command has been executed already. Now `sql("show tables").collect()` still triggers two queries, but the quey plans are not identical. The second query becomes:
![image](https://user-images.githubusercontent.com/3182036/121194850-b3659180-c8a1-11eb-9abf-2980f84f089d.png)
In addition to the UI improvements, this PR also has other benefits:
1. Simplifies code as caller side no need to worry about eager command execution. `QueryExecution` takes care of it.
2. It helps https://github.com/apache/spark/pull/32442 , where there can be more plan nodes above commands, and we need to replace commands with something like local relation that produces unsafe rows.
### Why are the changes needed?
Explained above.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
existing tests
Closes#32513 from beliefer/SPARK-35378.
Lead-authored-by: gengjiaan <gengjiaan@360.cn>
Co-authored-by: beliefer <beliefer@163.com>
Co-authored-by: Jiaan Geng <beliefer@163.com>
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Refactor LocalDateTimeUDT as YearUDT in UserDefinedTypeSuite
### Why are the changes needed?
As we are going to support java.time.LocalDateTime as an external type of TimestampWithoutTZ type https://github.com/apache/spark/pull/32814, registering java.time.LocalDateTime as UDT will cause test failures: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139469/testReport/
This PR is to unblock https://github.com/apache/spark/pull/32814.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit test.
Closes#32824 from gengliangwang/UDTFollowUp.
Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
### What changes were proposed in this pull request?
This PR fixes an issue that both key and value of state schema cannot accept long length (>65535 bytes) JSON.
To solve the problem explained below, JSON represented schema is divided into chunks whose maximum length is 65535 bytes, and each chunk is written by `DataOutputStream.writeUTF`.
As the solution changes the format of the schema, the version is also changes from `1` to `2` but old version schema is still acceptable to ensures backward compatibility.
### Why are the changes needed?
In the current implementation, writing state schema fails if the length of schema exceeds 65535 bytes and `UTFDataFormatException` is thrown.
It's due to the limitation of `DataOutputStream.writeUTF`.
`writeUTF` writes a length field first and it's 2 bytes width, meaning the maximum content length is limited to `2^16-1`=`65535` bytes.
https://docs.oracle.com/javase/8/docs/api/java/io/DataOutputStream.html#writeUTF-java.lang.String-
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New tests.
Closes#32788 from sarutak/fix-UTFDataFormatException.
Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
### What changes were proposed in this pull request?
This patch removes the usage of putting null into StateStore.
### Why are the changes needed?
According to `get` method doc in `StateStore` API, it returns non-null row if the key exists. So basically we should avoid write null to `StateStore`. You cannot distinguish if the returned null row is because the key doesn't exist, or the value is actually null. And due to the defined behavior of `get`, it is quite easy to cause NPE error if the caller doesn't expect to get a null if the caller believes the key exists.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added test.
Closes#32796 from viirya/fix-ss-joinstatemanager.
Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
### What changes were proposed in this pull request?
It's a long-standing bug that we forgot to resolve `UnresolvedAlias` in `CollectMetrics`. It's a bit hard to trigger this bug before 3.2 as most likely people won't create `UnresolvedAlias` when calling `Dataset.observe`. However things have been changed after https://github.com/apache/spark/pull/30974
This PR proposes to handle `CollectMetrics` in the rule `ResolveAliases`.
### Why are the changes needed?
bug fix
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
updated test
Closes#32803 from cloud-fan/minor.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Optimizes the retrieval of approximate quantiles for an array of percentiles.
* Adds an overload for QuantileSummaries.query that accepts an array of percentiles and optimizes the computation to do a single pass over the sketch and avoid redundant computation.
* Modifies the ApproximatePercentiles operator to call into the new method.
All formatting changes are the result of running ./dev/scalafmt
### Why are the changes needed?
The existing implementation does repeated calls per input percentile resulting in redundant computation.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added unit tests for the new method.
Closes#32700 from alkispoly-db/spark_35558_approx_quants_array.
Authored-by: Alkis Polyzotis <alkis.polyzotis@databricks.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
### What changes were proposed in this pull request?
Override `getJDBCType` method in `MySQLDialect` so that `FloatType` is mapped to `FLOAT` instead of `REAL`
### Why are the changes needed?
MySQL treats `REAL` as a synonym to `DOUBLE` by default (see https://dev.mysql.com/doc/refman/8.0/en/numeric-types.html). Therefore, when creating a table with a column of `REAL` type, it will be created as `DOUBLE`. However, currently, `MySQLDialect` does not provide an implementation for `getJDBCType`, and will thus ultimately fall back to `JdbcUtils.getCommonJDBCType`, which maps `FloatType` to `REAL`. This change is needed so that we can properly map the `FloatType` to `FLOAT` for MySQL.
### Does this PR introduce _any_ user-facing change?
Prior to this PR, when writing a dataframe with a `FloatType` column to a MySQL table, it will create a `DOUBLE` column. After the PR, it will create a `FLOAT` column.
### How was this patch tested?
Added a test case in `JDBCSuite` that verifies the mapping.
Closes#32605 from mariosmeim-db/SPARK-35446.
Authored-by: Marios Meimaris <marios.meimaris@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
A followup for 345d35ed1a, in this PR we support CURRENT_USER without tailing parentheses in default mode. And for ANSI mode, we can only use CURRENT_USER without tailing parentheses because it is a reserved keyword that cannot be used as a function name
### Why are the changes needed?
1. make it the same as current_date/current_timestamp
2. better ANSI compliance
### Does this PR introduce _any_ user-facing change?
no, just a followup
### How was this patch tested?
new tests
Closes#32770 from yaooqinn/SPARK-21957-F.
Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This PR is to fix the `UnsupportedOperationException` described in [PR#32705](https://github.com/apache/spark/pull/32705).
When AQE and DPP are turned on at the same time, because the `BroadcastExchange` included in the DPP filter is not added through `EnsureRequirement` rule, Therefore, when AQE optimizes the DPP filter, there is no way to add `BroadcastExchange` through the `EnsureRequirement` rule in `reOptimize` method, which eventually leads to the loss of `BroadcastExchange` in the final physical plan. This PR adds `BroadcastExchange` node in the `reOptimize` method if the current plan is DPP filter.
### Why are the changes needed?
bug fix
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
adding new ut
Closes#32741 from JkSelf/fixDPP+AQEbug.
Authored-by: Ke Jia <ke.a.jia@intel.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Add database if exists check in `SeesionCatalog`
### Why are the changes needed?
Curently execute `drop database test` will throw unfriendly error msg.
```
Error in query: org.apache.hadoop.hive.metastore.api.NoSuchObjectException: test
org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.metastore.api.NoSuchObjectException: test
at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:112)
at org.apache.spark.sql.hive.HiveExternalCatalog.dropDatabase(HiveExternalCatalog.scala:200)
at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.dropDatabase(ExternalCatalogWithListener.scala:53)
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.dropDatabase(SessionCatalog.scala:273)
at org.apache.spark.sql.execution.command.DropDatabaseCommand.run(ddl.scala:111)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:228)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3707)
```
### Does this PR introduce _any_ user-facing change?
Yes, more cleaner error msg.
### How was this patch tested?
Add test.
Closes#32768 from ulysses-you/SPARK-35629.
Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
### What changes were proposed in this pull request?
Currently, we do not have a suitable definition of the `user` concept in Spark. We only have a `sparkUser` app widely but do not support identify or retrieve the user information from a session in STS or a runtime query execution.
`current_user()` is very popular and supported by plenty of other modern or old school databases, and also ANSI compliant.
This PR add `current_user()` as a SQL function. And, they are the same. In this PR, we add these functions w/o ambiguity.
1. For a normal single-threaded Spark application, clearly the `sparkUser` is always equivalent to `current_user()` .
2. For a multi-threaded Spark application, e.g. Spark thrift server, we use a `ThreadLocal` variable to store the client-side user(after authenticated) before running the query and retrieve it in the parser.
### Why are the changes needed?
`current_user()` is very popular and supported by plenty of other modern or old school databases, and also ANSI compliant.
### Does this PR introduce _any_ user-facing change?
yes, added `current_user()` as a SQL function
### How was this patch tested?
new tests in thrift server and sql/catalyst
Closes#32718 from yaooqinn/SPARK-21957.
Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Add rule `ConvertToLocalRelation` into AQE Optimizer.
### Why are the changes needed?
Support propagate empty local relation through project and filter like such SQL case:
```
Aggregate
Project
Join
ShuffleStage
ShuffleStage
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Add test.
Closes#32724 from ulysses-you/SPARK-35585.
Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
The condition check for FULL OUTER sort merge join (https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala#L1368 ) has unnecessary trip when `leftIndex == leftMatches.size` or `rightIndex == rightMatches.size`. Though this does not affect correctness (`scanNextInBuffered()` returns false anyway). But we can avoid it in the first place.
### Why are the changes needed?
Better readability for developers and avoid unnecessary execution.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing unit tests, such as `OuterJoinSuite.scala`.
Closes#32736 from c21/join-bug.
Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Gengliang Wang <ltnwgl@gmail.com>
### What changes were proposed in this pull request?
A test case of AdaptiveQueryExecSuite becomes flaky since there are too many debug logs in RootLogger:
https://github.com/Yikun/spark/runs/2715222392?check_suite_focus=truehttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139125/testReport/
To fix it, I suggest supporting multiple loggers in the testing method withLogAppender. So that the LogAppender gets clean target log outputs.
### Why are the changes needed?
Fix a flaky test case.
Also, reduce unnecessary memory cost in tests.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit test
Closes#32725 from gengliangwang/fixFlakyLogAppender.
Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
### What changes were proposed in this pull request?
Move `Set` command related test cases from `SQLQuerySuite` to a new test suite `SetCommandSuite`. There are 7 test cases in total.
### Why are the changes needed?
Code refactoring. `SQLQuerySuite` is becoming big.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit tests
Closes#32732 from gengliangwang/setsuite.
Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
In the PR, I propose to support special datetime values introduced by #25708 and by #25716 only in typed literals, and don't recognize them in parsing strings to dates/timestamps. The following string values are supported only in typed timestamp literals:
- `epoch [zoneId]` - `1970-01-01 00:00:00+00 (Unix system time zero)`
- `today [zoneId]` - midnight today.
- `yesterday [zoneId]` - midnight yesterday
- `tomorrow [zoneId]` - midnight tomorrow
- `now` - current query start time.
For example:
```sql
spark-sql> SELECT timestamp 'tomorrow';
2019-09-07 00:00:00
```
Similarly, the following special date values are supported only in typed date literals:
- `epoch [zoneId]` - `1970-01-01`
- `today [zoneId]` - the current date in the time zone specified by `spark.sql.session.timeZone`.
- `yesterday [zoneId]` - the current date -1
- `tomorrow [zoneId]` - the current date + 1
- `now` - the date of running the current query. It has the same notion as `today`.
For example:
```sql
spark-sql> SELECT date 'tomorrow' - date 'yesterday';
2
```
### Why are the changes needed?
In the current implementation, Spark supports the special date/timestamp value in any input strings casted to dates/timestamps that leads to the following problems:
- If executors have different system time, the result is inconsistent, and random. Column values depend on where the conversions were performed.
- The special values play the role of distributed non-deterministic functions though users might think of the values as constants.
### Does this PR introduce _any_ user-facing change?
Yes but the probability should be small.
### How was this patch tested?
By running existing test suites:
```
$ build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite -- -z interval.sql"
$ build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite -- -z date.sql"
$ build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite -- -z timestamp.sql"
$ build/sbt "test:testOnly *DateTimeUtilsSuite"
```
Closes#32714 from MaxGekk/remove-datetime-special-values.
Lead-authored-by: Max Gekk <max.gekk@gmail.com>
Co-authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
### What changes were proposed in this pull request?
Currently, the results of following SQL queries are not redacted:
```
SET [KEY];
SET;
```
For example:
```
scala> spark.sql("set javax.jdo.option.ConnectionPassword=123456").show()
+--------------------+------+
| key| value|
+--------------------+------+
|javax.jdo.option....|123456|
+--------------------+------+
scala> spark.sql("set javax.jdo.option.ConnectionPassword").show()
+--------------------+------+
| key| value|
+--------------------+------+
|javax.jdo.option....|123456|
+--------------------+------+
scala> spark.sql("set").show()
+--------------------+--------------------+
| key| value|
+--------------------+--------------------+
|javax.jdo.option....| 123456|
```
We should hide the sensitive information and redact the query output.
### Why are the changes needed?
Security.
### Does this PR introduce _any_ user-facing change?
Yes, the sensitive information in the output of Set commands are redacted
### How was this patch tested?
Unit test
Closes#32712 from gengliangwang/redactSet.
Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
This is a minor change to update how `StateStoreRestoreExec` computes its number of output rows. Previously we only count input rows, but the optionally restored rows are not counted in.
### Why are the changes needed?
Currently the number of output rows of `StateStoreRestoreExec` only counts the each input row. But it actually outputs input rows + optional restored rows. We should provide correct number of output rows.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing tests.
Closes#32703 from viirya/fix-outputrows.
Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This PR refactors `SubqueryExpression` class. It removes the children field from SubqueryExpression's constructor and adds `outerAttrs` and `joinCond`.
### Why are the changes needed?
Currently, the children field of a subquery expression is used to store both collected outer references in the subquery plan and join conditions after correlated predicates are pulled up.
For example:
`SELECT (SELECT max(c1) FROM t1 WHERE t1.c1 = t2.c1) FROM t2`
During the analysis phase, outer references in the subquery are stored in the children field: `scalar-subquery [t2.c1]`, but after the optimizer rule `PullupCorrelatedPredicates`, the children field will be used to store the join conditions, which contain both the inner and the outer references: `scalar-subquery [t1.c1 = t2.c1]`. This is why the references of SubqueryExpression excludes the inner plan's output:
29ed1a2de4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala (L68-L69)
This can be confusing and error-prone. The references for a subquery expression should always be defined as outer attribute references.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
Closes#32687 from allisonwang-db/refactor-subquery-expr.
Authored-by: allisonwang-db <66282705+allisonwang-db@users.noreply.github.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
After SPARK-29291 and SPARK-33352, there are still some compilation warnings about `procedure syntax is deprecated` as follows:
```
[WARNING] [Warn] /spark/core/src/main/scala/org/apache/spark/MapOutputTracker.scala:723: [deprecation | origin= | version=2.13.0] procedure syntax is deprecated: instead, add `: Unit =` to explicitly declare `registerMergeResult`'s return type
[WARNING] [Warn] /spark/core/src/main/scala/org/apache/spark/MapOutputTracker.scala:748: [deprecation | origin= | version=2.13.0] procedure syntax is deprecated: instead, add `: Unit =` to explicitly declare `unregisterMergeResult`'s return type
[WARNING] [Warn] /spark/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala:223: [deprecation | origin= | version=2.13.0] procedure syntax is deprecated: instead, add `: Unit =` to explicitly declare `testSimpleSpillingForAllCodecs`'s return type
[WARNING] [Warn] /spark/mllib-local/src/test/scala/org/apache/spark/ml/linalg/BLASBenchmark.scala:53: [deprecation | origin= | version=2.13.0] procedure syntax is deprecated: instead, add `: Unit =` to explicitly declare `runBLASBenchmark`'s return type
[WARNING] [Warn] /spark/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala:110: [deprecation | origin= | version=2.13.0] procedure syntax is deprecated: instead, add `: Unit =` to explicitly declare `assertEmptyRootPath`'s return type
[WARNING] [Warn] /spark/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala:602: [deprecation | origin= | version=2.13.0] procedure syntax is deprecated: instead, add `: Unit =` to explicitly declare `executeCTASWithNonEmptyLocation`'s return type
```
So the main change of this pr is cleanup these compilation warnings.
### Why are the changes needed?
Eliminate compilation warnings in Scala 2.13 and this change should be compatible with Scala 2.12
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass the Jenkins or GitHub Action
Closes#32669 from LuciferYang/re-clean-procedure-syntax.
Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
Added the following TreePattern enums:
- EXCHANGE
- IN_SUBQUERY_EXEC
- UPDATE_FIELDS
Migrated `transformAllExpressions` call sites to use `transformAllExpressionsWithPruning`
### Why are the changes needed?
Reduce the number of tree traversals and hence improve the query compilation latency.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
Perf diff:
Rule name | Total Time (baseline) | Total Time (experiment) | experiment/baseline
OptimizeUpdateFields | 54646396 | 27444424 | 0.5
ReplaceUpdateFieldsExpression | 24694303 | 2087517 | 0.08
Closes#32643 from sigmod/all_expressions.
Authored-by: Yingyi Bu <yingyi.bu@databricks.com>
Signed-off-by: Xingbo Jiang <xingbo.jiang@databricks.com>
### What changes were proposed in this pull request?
I just noticed that `AdaptiveQueryExecSuite.SPARK-34091: Batch shuffle fetch in AQE partition coalescing` takes more than 10 minutes to finish, which is unacceptable.
This PR sets the shuffle partitions to 10 in that test, so that the test can finish with 5 seconds.
### Why are the changes needed?
speed up the test
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
N/A
Closes#32695 from cloud-fan/test.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
Add a new method `isMaterialized` in `QueryStageExec`.
### Why are the changes needed?
Currently, we use `resultOption().get.isDefined` to check if a query stage has materialized. The code is not readable at a glance. It's better to use a new method like `isMaterialized` to define it.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass CI.
Closes#32689 from ulysses-you/SPARK-35552.
Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
### What changes were proposed in this pull request?
Various small code simplification/cleanup for OptimizeSkewedJoin
### Why are the changes needed?
code refactor
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
existing tests
Closes#32685 from cloud-fan/skew-join.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
### What changes were proposed in this pull request?
Initial implementation of RocksDBCheckpointMetadata. It persists the metadata for RocksDBFileManager.
### Why are the changes needed?
The RocksDBCheckpointMetadata persists the metadata for each committed batch in JSON format. The object contains all RocksDB file names and the number of total keys.
The metadata binds closely with the directory structure of RocksDBFileManager, as described in the design doc - [Directory Structure and Format for Files stored in DFS](https://docs.google.com/document/d/10wVGaUorgPt4iVe4phunAcjU924fa3-_Kf29-2nxH6Y/edit#heading=h.zgvw85ijoz2).
### Does this PR introduce _any_ user-facing change?
No. Internal implementation only.
### How was this patch tested?
New UT added.
Closes#32272 from xuanyuanking/SPARK-35172.
Lead-authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Co-authored-by: Tathagata Das <tathagata.das1565@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
### What changes were proposed in this pull request?
Spark conv function is from MySQL and it's better to follow the MySQL behavior. MySQL returns the max unsigned long if the input string is too big, and Spark should follow it.
However, seems Spark has different behavior in two cases:
MySQL allows leading spaces but Spark does not.
If the input string is way too long, Spark fails with ArrayIndexOutOfBoundException
This patch now help conv follow behavior in those two cases
conv allows leading spaces
conv will return the max unsigned long when the input string is way too long
### Why are the changes needed?
fixing it to match the behavior of conv function to the (almost) only one reference of another DBMS, MySQL
### Does this PR introduce _any_ user-facing change?
Yes, as pointed out above
### How was this patch tested?
Add test
Closes#32684 from dgd-contributor/SPARK-33428.
Authored-by: dgd-contributor <dgd_contributor@viettel.com.vn>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Add a new data source V2 API: `LocalScan`. It is a special Scan that will happen on Driver locally instead of Executors.
### Why are the changes needed?
The new API improves the flexibility of the DSV2 API. It allows developers to implement connectors for data sources of small data sizes.
For example, we can build a data source for Spark History applications from Spark History Server RESTFUL API. The result set is small and fetching all the results from the Spark driver is good enough. Making it a data source allows us to operate SQL queries with filters or table joins.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit test
Closes#32678 from gengliangwang/LocalScan.
Lead-authored-by: Gengliang Wang <ltnwgl@gmail.com>
Co-authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>