Commit graph

31197 commits

Author SHA1 Message Date
Yuanjian Li 3d57e00a7f [SPARK-36041][SS][DOCS] Introduce the RocksDBStateStoreProvider in the programming guide
### What changes were proposed in this pull request?
Add the document for the new RocksDBStateStoreProvider.

### Why are the changes needed?
User guide for the new feature.

### Does this PR introduce _any_ user-facing change?
No, doc only.

### How was this patch tested?
Doc only.

Closes #33683 from xuanyuanking/SPARK-36041.

Authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
2021-08-16 12:32:08 -07:00
zhuqi-lucas 05cd5f97c3 [SPARK-35548][CORE][SHUFFLE] Handling new attempt has started error message in BlockPushErrorHandler in client
### What changes were proposed in this pull request?
Add a new type of error message in BlockPushErrorHandler which indicates the PushblockStream message is received after a new application attempt has started. This error message should be correctly handled in client without retrying the block push.

### Why are the changes needed?
When we get a block push failure because of the too old attempt, we will not retry pushing the block nor log the exception on the client side.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Add the corresponding unit test.

Closes #33617 from zhuqi-lucas/master.

Authored-by: zhuqi-lucas <821684824@qq.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
2021-08-16 13:58:48 -05:00
Xinrong Meng 4dcd746025 [SPARK-36469][PYTHON] Implement Index.map
### What changes were proposed in this pull request?
Implement `Index.map`.

The PR is based on https://github.com/databricks/koalas/pull/2136. Thanks awdavidson for the prototype.

`map` of CategoricalIndex and DatetimeIndex will be implemented in separate PRs.

### Why are the changes needed?
Mapping values using input correspondence (a dict, Series, or function) is supported in pandas as [Index.map](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Index.map.html).
We shall also support hat.

### Does this PR introduce _any_ user-facing change?
Yes. `Index.map` is available now.

```py
>>> psidx = ps.Index([1, 2, 3])

>>> psidx.map({1: "one", 2: "two", 3: "three"})
Index(['one', 'two', 'three'], dtype='object')

>>> psidx.map(lambda id: "{id} + 1".format(id=id))
Index(['1 + 1', '2 + 1', '3 + 1'], dtype='object')

>>> pser = pd.Series(["one", "two", "three"], index=[1, 2, 3])
>>> psidx.map(pser)
Index(['one', 'two', 'three'], dtype='object')
```

### How was this patch tested?
Unit tests.

Closes #33694 from xinrong-databricks/index_map.

Authored-by: Xinrong Meng <xinrong.meng@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
2021-08-16 11:06:10 -07:00
Kazuyuki Tanimura 8ee464cd7a [SPARK-32210][CORE] Fix NegativeArraySizeException in MapOutputTracker with large spark.default.parallelism
### What changes were proposed in this pull request?
The current `MapOutputTracker` class may throw `NegativeArraySizeException` with a large number of partitions. Within the serializeOutputStatuses() method, it is trying to compress an array of mapStatuses and outputting the binary data into (Apache)ByteArrayOutputStream . Inside the (Apache)ByteArrayOutputStream.toByteArray(), negative index exception happens because the index is int and overflows (2GB limit) when the output binary size is too large.

This PR proposes two high-level ideas:
  1. Use `org.apache.spark.util.io.ChunkedByteBufferOutputStream`, which has a way to output the underlying buffer as `Array[Array[Byte]]`.
  2. Change the signatures from `Array[Byte]` to `Array[Array[Byte]]` in order to handle over 2GB compressed data.

### Why are the changes needed?
This issue seems to be missed out in the earlier effort of addressing 2GB limitations [SPARK-6235](https://issues.apache.org/jira/browse/SPARK-6235)

Without this fix, `spark.default.parallelism` needs to be kept at the low number. The drawback of setting smaller spark.default.parallelism is that it requires more executor memory (more data per partition). Setting `spark.io.compression.zstd.level` to higher number (default 1) hardly helps.

That essentially means we have the data size limit that for shuffling and does not scale.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Passed existing tests
```
build/sbt "core/testOnly org.apache.spark.MapOutputTrackerSuite"
```
Also added a new unit test
```
build/sbt "core/testOnly org.apache.spark.MapOutputTrackerSuite  -- -z SPARK-32210"
```
Ran the benchmark using GitHub Actions and didn't not observe any performance penalties. The results are attached in this PR
```
core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt
core/benchmarks/MapStatusesSerDeserBenchmark-results.txt
```

Closes #33721 from kazuyukitanimura/SPARK-32210.

Authored-by: Kazuyuki Tanimura <ktanimura@apple.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-08-16 09:11:39 -07:00
Max Gekk f620996142 [SPARK-36418][SQL] Use CAST in parsing of dates/timestamps with default pattern
### What changes were proposed in this pull request?
In the PR, I propose to use the `CAST` logic when the pattern is not specified in `DateFormatter` or `TimestampFormatter`. In particular, invoke the `DateTimeUtils.stringToTimestampAnsi()` or `stringToDateAnsi()` in the case.

### Why are the changes needed?
1. This can improve user experience with Spark SQL by making the default date/timestamp parsers more flexible and tolerant to their inputs.
2. We make the default case consistent to the behavior of the `CAST` expression which makes implementation more consistent.

### Does this PR introduce _any_ user-facing change?
The changes shouldn't introduce behavior change in regular cases but it can influence on corner cases. New implementation is able to parse more dates/timestamps by default. For instance, old (current) date parses can recognize dates only in the format **yyyy-MM-dd** but new one can handle:
   * `[+-]yyyy*`
   * `[+-]yyyy*-[m]m`
   * `[+-]yyyy*-[m]m-[d]d`
   * `[+-]yyyy*-[m]m-[d]d `
   * `[+-]yyyy*-[m]m-[d]d *`
   * `[+-]yyyy*-[m]m-[d]dT*`

Similarly for timestamps. The old (current) timestamp formatter is able to parse timestamps only in the format **yyyy-MM-dd HH:mm:ss** by default, but new implementation can handle:
   * `[+-]yyyy*`
   * `[+-]yyyy*-[m]m`
   * `[+-]yyyy*-[m]m-[d]d`
   * `[+-]yyyy*-[m]m-[d]d `
   * `[+-]yyyy*-[m]m-[d]d [h]h:[m]m:[s]s.[ms][ms][ms][us][us][us][zone_id]`
   * `[+-]yyyy*-[m]m-[d]dT[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us][zone_id]`
   * `[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us][zone_id]`
   * `T[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us][zone_id]`

### How was this patch tested?
By running the affected test suites:
```
$ build/sbt "test:testOnly *ImageFileFormatSuite"
$ build/sbt "test:testOnly *ParquetV2PartitionDiscoverySuite"
```

Closes #33709 from MaxGekk/datetime-cast-default-pattern.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-08-16 23:29:33 +08:00
Venkata krishnan Sowrirajan 2270ecf32f [SPARK-36374][SHUFFLE][DOC] Push-based shuffle high level user documentation
### What changes were proposed in this pull request?

Document the push-based shuffle feature with a high level overview of the feature and corresponding configuration options for both shuffle server side as well as client side. This is how the changes to the doc looks on the browser ([img](https://user-images.githubusercontent.com/8871522/129231582-ad86ee2f-246f-4b42-9528-4ccd693e86d2.png))

### Why are the changes needed?

Helps users understand the feature

### Does this PR introduce _any_ user-facing change?

Docs

### How was this patch tested?

N/A

Closes #33615 from venkata91/SPARK-36374.

Authored-by: Venkata krishnan Sowrirajan <vsowrirajan@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
2021-08-16 10:24:40 -05:00
Wenchen Fan f4b31c6068 [SPARK-36498][SQL] Reorder inner fields of the input query in byName V2 write
### What changes were proposed in this pull request?

Today, when we write data to a v2 table with byName mode, we only reorder the top-level columns, not inner struct fields. This doesn't make sense as Spark should treat inner struct fields as the first-class citizen (e.g. nested column pruning, filter pushdown with nested columns).

This PR improves `TableOutputResolver` to reorder inner fields as well.

### Why are the changes needed?

better user-experience

### Does this PR introduce _any_ user-facing change?

yes, more queries are allowed to write to v2 tables.

### How was this patch tested?

new test

Closes #33728 from cloud-fan/reorder.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-08-16 15:08:08 +08:00
Kousuke Saruta 9b9db5a8a0 [SPARK-36491][SQL] Make from_json/to_json to handle timestamp_ntz type properly
### What changes were proposed in this pull request?

This PR fixes an issue that `from_json` and `to_json` cannot handle `timestamp_ntz` type properly.
In the current master, `from_json`/`to_json` can handle `timestamp` type like as follows.
```
SELECT from_json('{"a":"2021-11-23 11:22:33"}', "a TIMESTAMP");
{"a":2021-11-23 11:22:33}
```
```
SELECT to_json(map("a", TIMESTAMP"2021-11-23 11:22:33"));
{"a":"2021-11-23T11:22:33.000+09:00"}
```
But they cannot handle `timestamp_ntz` type properly.
```
SELECT from_json('{"a":"2021-11-23 11:22:33"}', "a TIMESTAMP_NTZ");
21/08/12 16:16:00 ERROR SparkSQLDriver: Failed in [SELECT from_json('{"a":"2021-11-23 11:22:33"}', "a TIMESTAMP_NTZ")]
java.lang.Exception: Unsupported type: timestamp_ntz
        at org.apache.spark.sql.errors.QueryExecutionErrors$.unsupportedTypeError(QueryExecutionErrors.scala:777)
        at org.apache.spark.sql.catalyst.json.JacksonParser.makeConverter(JacksonParser.scala:339)
        at org.apache.spark.sql.catalyst.json.JacksonParser.$anonfun$makeConverter$17(JacksonParser.scala:313)
```
```
SELECT to_json(map("a", TIMESTAMP_NTZ"2021-11-23 11:22:33"));
21/08/12 16:14:07 ERROR SparkSQLDriver: Failed in [SELECT to_json(map("a", TIMESTAMP_NTZ"2021-11-23 11:22:33"))]
java.lang.RuntimeException: Failed to convert value 1637666553000000 (class of class java.lang.Long) with the type of TimestampNTZType to JSON.
        at org.apache.spark.sql.errors.QueryExecutionErrors$.failToConvertValueToJsonError(QueryExecutionErrors.scala:294)
        at org.apache.spark.sql.catalyst.json.JacksonGenerator.$anonfun$makeWriter$25(JacksonGenerator.scala:201)
        at org.apache.spark.sql.catalyst.json.JacksonGenerator.$anonfun$makeWriter$25$adapted(JacksonGenerator.scala:199)
        at org.apache.spark.sql.catalyst.json.JacksonGenerator.writeMapData(JacksonGenerator.scala:253)
        at org.apache.spark.sql.catalyst.json.JacksonGenerator.$anonfun$write$3(JacksonGenerator.scala:293)
        at org.apache.spark.sql.catalyst.json.JacksonGenerator.writeObject(JacksonGenerator.scala:206)
        at org.apache.spark.sql.catalyst.json.JacksonGenerator.write(JacksonGenerator.scala:292)
```
### Why are the changes needed?

Bug fix.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New test.

Closes #33742 from sarutak/json-ntz.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-08-16 10:03:22 +03:00
Liang-Chi Hsieh 8b8d91cf64 [SPARK-36465][SS] Dynamic gap duration in session window
### What changes were proposed in this pull request?

This patch supports dynamic gap duration in session window.

### Why are the changes needed?

The gap duration used in session window for now is a static value. To support more complex usage, it is better to support dynamic gap duration which determines the gap duration by looking at the current data. For example, in our usecase, we may have different gap by looking at the certain column in the input rows.

### Does this PR introduce _any_ user-facing change?

Yes, users can specify dynamic gap duration.

### How was this patch tested?

Modified existing tests and new test.

Closes #33691 from viirya/dynamic-session-window-gap.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
2021-08-16 11:06:00 +09:00
Hyukjin Kwon e369499d14 [MINOR][DOCS] More correct results for GitHub Actions build link at README.md
### What changes were proposed in this pull request?

This PR proposes to use more correct link for GitHub Actions build to only show the full builds at the commits in the master branch.

Current link includes scheduled builds (https://github.com/apache/spark/actions/workflows/build_and_test.yml?query=branch%3Amaster+event%3Aschedule).

### Why are the changes needed?

To show the full builds at the commits in the master branch only.

### Does this PR introduce _any_ user-facing change?

No, dev only.

Before:

![Screen Shot 2021-08-15 at 11 11 19 AM](https://user-images.githubusercontent.com/6477701/129464841-7207865e-3f94-4e18-8db6-e8fa36a926bf.png)

After:

![Screen Shot 2021-08-15 at 11 12 01 AM](https://user-images.githubusercontent.com/6477701/129464842-887d121e-88d3-4342-b11a-3048df88856d.png)

### How was this patch tested?

Manually checked by clicking these links.

Closes #33745 from HyukjinKwon/minor-link-readme.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-08-14 22:05:16 -07:00
Pablo Langa a9ab41ad56 [SPARK-35320][SQL] Align error message for unsupported key types in MapType in Json reader
### What changes were proposed in this pull request?

This PR is related with https://github.com/apache/spark/pull/33525.
The purpose is to align error messages between the function from_json and the Json reader for unsupported key types in MapType.
Current behavior:
```
scala> spark.read.schema(StructType(Seq(StructField("col", MapType(IntegerType, StringType))))).json(Seq("""{"1": "test"}""").toDS()).show
+----+
| col|
+----+
|null|
+----+

```
```
scala> Seq("""{"1": "test"}""").toDF("col").write.json("/tmp/jsontests1234")

scala> spark.read.schema(StructType(Seq(StructField("col", MapType(IntegerType, StringType))))).json("/tmp/jsontests1234").show
+----+
| col|
+----+
|null|
+----+
```
With this change, an AnalysisException with the message `"Input schema $schema can only contain StringType as a key type for a MapType."` wil be thrown

### Why are the changes needed?

It's more consistent to align the behavior

### Does this PR introduce _any_ user-facing change?

Yes, now an Exception will be thrown

### How was this patch tested?

Unit testing, manual testing

Closes #33672 from planga82/feature/spark35320_improve_error_message_reader.

Authored-by: Pablo Langa <soypab@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-08-15 10:31:57 +09:00
Huaxin Gao 3f8ec0dae4 [SPARK-34952][SQL][FOLLOWUP] Normalize pushed down aggregate col name and group by col name
### What changes were proposed in this pull request?
Normalize pushed down aggregate col names and group by col names ...

### Why are the changes needed?
to handle case sensitive col names

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Modify existing test

Closes #33739 from huaxingao/normalize.

Authored-by: Huaxin Gao <huaxin_gao@apple.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-08-13 22:31:21 -07:00
Gengliang Wang ecdea91602 [SPARK-36508][SQL] ANSI type coercion: disallow binary operations between Interval and String literal
### What changes were proposed in this pull request?

If a binary operation contains interval type and string literal, we can't decide which interval type the string literal should be promoted as. There are many possible interval types, such as year interval, month interval, day interval, hour interval, etc.
The related binary operation for Interval contains
- Add
- Subtract
- Comparisions

Note that `Interval Multiple/Divide StringLiteral` is valid as them is not binary operators(the left and right are not of the same type). This PR also add tests for them.

### Why are the changes needed?

Avoid ambiguously implicit casting string literals to interval types.

### Does this PR introduce _any_ user-facing change?

No, the ANSI type coercion is not released yet.

### How was this patch tested?

New tests.

Closes #33737 from gengliangwang/disallowStringAndInterval.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
2021-08-14 10:45:05 +08:00
Angerszhuuuu 8c90ca8468 [SPARK-36475][DOC] Add doc about spark.shuffle.service.fetch.rdd.enabled
### What changes were proposed in this pull request?
Update doc about `spark.shuffle.service.fetch.rdd.enabled`

### Why are the changes needed?
Add doc about `spark.shuffle.service.fetch.rdd.enabled`

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Not need

Closes #33700 from AngersZhuuuu/SPARK-25888-FOLLOWUP.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-08-13 10:11:14 -07:00
Huaxin Gao 959cf50a1b [SPARK-36503][SQL] Add RowToColumnConverter for BinaryType
### What changes were proposed in this pull request?
Add RowToColumnConverter for BinaryType

### Why are the changes needed?
currently, we have RowToColumnConverter for all data types except BinaryType
```
  private def getConverterForType(dataType: DataType, nullable: Boolean): TypeConverter = {
    val core = dataType match {
      case BooleanType => BooleanConverter
      case ByteType => ByteConverter
      case ShortType => ShortConverter
      case IntegerType | DateType => IntConverter
      case FloatType => FloatConverter
      case LongType | TimestampType => LongConverter
      case DoubleType => DoubleConverter
      case StringType => StringConverter
      case CalendarIntervalType => CalendarConverter
      case at: ArrayType => ArrayConverter(getConverterForType(at.elementType, at.containsNull))
      case st: StructType => new StructConverter(st.fields.map(
        (f) => getConverterForType(f.dataType, f.nullable)))
      case dt: DecimalType => new DecimalConverter(dt)
      case mt: MapType => MapConverter(getConverterForType(mt.keyType, nullable = false),
        getConverterForType(mt.valueType, mt.valueContainsNull))
      case unknown => throw QueryExecutionErrors.unsupportedDataTypeError(unknown.toString)
    }
```
so add one for BinaryType

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
modify existing test

Closes #33733 from huaxingao/binary_converter.

Authored-by: Huaxin Gao <huaxin_gao@apple.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-08-13 10:08:45 -07:00
zwangsheng 46f56e632f [SPARK-36487][CORE] Modify exit executor log logic
### What changes were proposed in this pull request?
Adjust the log logic of CoarseGrainedExecutorBackend

### Why are the changes needed?
When exit executor with system code 0, coarseGrainedExecutorBackend will print ERROR log.
That doesn't make sense, because it seems to me that executor's normal decommission is not caused by an exception.

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
local

Closes #33718 from zwangsheng/enhance/exit-executor-log.

Authored-by: zwangsheng <2213335496@qq.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
2021-08-13 11:44:36 -05:00
yi.wu a47ceaf549 [SPARK-32920][CORE][FOLLOW-UP] Fix string interpolator in the log
### What changes were proposed in this pull request?

fix string interpolator

### Why are the changes needed?

To log the correct stage info.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass existed tests.

Closes #33738 from Ngone51/fix.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-08-13 21:44:22 +09:00
Xingbo Jiang ec5f3a17e3 [SPARK-36500][CORE] Fix temp_shuffle file leaking when a task is interrupted
### What changes were proposed in this pull request?

When a task thread is interrupted, the underlying output stream referred by `DiskBlockObjectWriter.mcs` may have been closed, then we get IOException when flushing the buffered data. This breaks the assumption that `revertPartialWritesAndClose()` should not throw exceptions.

To fix the issue, we can catch the IOException in `ManualCloseOutputStream.manualClose()`.

### Why are the changes needed?

Previously the IOException was not captured, thus `revertPartialWritesAndClose()` threw an exception. When this happens, `BypassMergeSortShuffleWriter.stop()` would stop deleting the temp_shuffle files tracked by `partitionWriters`, hens lead to temp_shuffle file leak issues.

### Does this PR introduce _any_ user-facing change?

No, this is an internal bug fix.

### How was this patch tested?

Tested by running a longevity stress test. After the fix, there is no more leaked temp_shuffle files.

Closes #33731 from jiangxb1987/temp_shuffle.

Authored-by: Xingbo Jiang <xingbo.jiang@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-08-13 19:25:20 +09:00
yangjie01 1da1e33a49 [SPARK-36495][SQL] Use type match to simplify methods in CatalystTypeConverter
### What changes were proposed in this pull request?

`CatalystTypeConverter.toCatalyst` method use `isInstanceOf  + asInstanceOf` for type conversion, the main change of this pr is use  type match to simplify this process.

`CatalystTypeConverters.createToCatalystConverter` method has a similar pattern.

### Why are the changes needed?
Code simplification

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
- Pass the Jenkins or GitHub Action
- Add a new case to `ScalaReflectionSuite` to add the coverage of the `case None` branch of `CatalystTypeConverter#toCatalyst` method

Closes #33722 from LuciferYang/SPARK-36495.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-08-13 19:24:22 +09:00
Kousuke Saruta 41436b2956 [SPARK-36507][DOCS] Remove/Replace missing links to AMP Camp materials from index.md
### What changes were proposed in this pull request?

This PR removes/replaces missing links to AMP Camp materials from `index.md`.
I found videos about AMP Camps on YouTube so I replaced the links to the videos with them, and removes the rest of missing links.

### Why are the changes needed?

Document maintenance.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Built and confirmed the `index.html` generated from `index.md`.
Also confirmed that the replaced link is available.

Closes #33734 from sarutak/remove-and-replace-missing-links.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-08-13 13:19:28 +03:00
Kousuke Saruta 7fd34548b1 [SPARK-36490][SQL] Make from_csv/to_csv to handle timestamp_ntz type properly
### What changes were proposed in this pull request?

This PR fixes an issue that `from_csv` and `to_csv` cannot handle `timestamp_ntz` type properly.
In the current master, to_csv/from_csv can handle timestamp type like as follows.
```
SELECT to_csv(struct(TIMESTAMP"2021-11-23 11:22:33"));
2021-11-23T11:22:33.000+09:00
```
```
SELECT from_csv("2021-11-23 11:22:33", "a TIMESTAMP");
{"a":2021-11-23 11:22:33}
```

But they cannot handle timestamp_ntz type properly.
```
SELECT to_csv(struct(TIMESTAMP_NTZ"2021-11-23 11:22:33"));
-- 2021-11-23T11:22:33.000 is expected.
1637666553000000
```
```
SELECT from_csv("2021-11-23 11:22:33", "a TIMESTAMP_NTZ");
21/08/12 16:12:49 ERROR SparkSQLDriver: Failed in [SELECT from_csv("2021-11-23 11:22:33", "a TIMESTAMP_NTZ")]
java.lang.Exception: Unsupported type: timestamp_ntz
        at org.apache.spark.sql.errors.QueryExecutionErrors$.unsupportedTypeError(QueryExecutionErrors.scala:777)
        at org.apache.spark.sql.catalyst.csv.UnivocityParser.makeConverter(UnivocityParser.scala:234)
        at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$valueConverters$1(UnivocityParser.scala:134)
```

### Why are the changes needed?

Bug fix.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New test.

Closes #33719 from sarutak/csv-ntz.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-08-13 12:08:53 +03:00
Gengliang Wang eb6be7f1ee [SPARK-36499][SQL][TESTS] Test Interval multiply / divide null
### What changes were proposed in this pull request?

Test the following valid operations:
```
year-month interval * null
null * year-month interval
year-month interval / null
```
and invalid operations:
```
null / interval
int / interval
```

### Why are the changes needed?

Improve test coverage

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Pass CI

Closes #33729 from gengliangwang/addTest.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-08-13 11:05:57 +03:00
gengjiaan 7d82336734 [SPARK-36428][SQL] the seconds parameter of make_timestamp should accept integer type
### What changes were proposed in this pull request?
With ANSI mode, `SELECT make_timestamp(1, 1, 1, 1, 1, 1)` fails, because the 'seconds' parameter needs to be of type DECIMAL(8,6), and INT can't be implicitly casted to DECIMAL(8,6) under ANSI mode.

```
org.apache.spark.sql.AnalysisException
cannot resolve 'make_timestamp(1, 1, 1, 1, 1, 1)' due to data type mismatch: argument 6 requires decimal(8,6) type, however, '1' is of int type.; line 1 pos 7
```

We should update the function `make_timestamp` to allow integer type 'seconds' parameter.

### Why are the changes needed?
Make `make_timestamp` could accepts integer as 'seconds' parameter.

### Does this PR introduce _any_ user-facing change?
'Yes'.
`make_timestamp` could accepts integer as 'seconds' parameter.

### How was this patch tested?
New tests.

Closes #33665 from beliefer/SPARK-36428.

Lead-authored-by: gengjiaan <gengjiaan@360.cn>
Co-authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-08-13 13:13:02 +08:00
Maryann Xue 29b1e394c6 [SPARK-36447][SQL] Avoid inlining non-deterministic With-CTEs
### What changes were proposed in this pull request?
This PR fixes an existing correctness issue where a non-deterministic With-CTE can be executed multiple times producing different results, by deferring the inline of With-CTE to after the analysis stage. This fix also provides the future opportunity of performance improvement by executing deterministic With-CTEs only once in some circumstances.

The major changes include:
1. Added new With-CTE logical nodes: `CTERelationDef`, `CTERelationRef`, `WithCTE`. Each `CTERelationDef` has a unique ID and the mapping between CTE def and CTE ref is based on IDs rather than names. `WithCTE` is a resolved version of `With`, only that: 1) `WithCTE` is a multi-children logical node so that most logical rules can automatically apply to CTE defs; 2) In the main query and each subquery, there can only be at most one `WithCTE`, which means nested With-CTEs are combined.
2. Changed `CTESubstitution` rule so that if NOT in legacy mode, CTE defs will not be inlined immediately, but rather transformed into a `CTERelationRef` per reference.
3. Added new With-CTE rules: 1) `ResolveWithCTE` - to update `CTERelationRef`s with resolved output from corresponding `CTERelationDef`s; 2) `InlineCTE` - to inline deterministic CTEs or non-deterministic CTEs with only ONE reference; 3) `UpdateCTERelationStats` - to update stats for `CTERelationRef`s that are not inlined.
4. Added a CTE physical planning strategy to plan `CTERelationRef`s as an independent shuffle with round-robin partitioning so that such CTEs will only be materialized once and different references will later be a shuffle reuse.

A current limitation is that With-CTEs mixed with SQL commands or DMLs will still go through the old inline code path because of our non-standard language specs and not-unified command/DML interfaces.

### Why are the changes needed?
This is a correctness issue. Non-deterministic CTEs should produce the same output regardless of how many times it is referenced/used in query, while under the current implementation there is no such guarantee and would lead to incorrect query results.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Added UTs.
Regenerated golden files for TPCDS plan stability tests. There is NO change to the `simplified.txt` files, the only differences are expression IDs.

Closes #33671 from maryannxue/spark-36447.

Authored-by: Maryann Xue <maryann.xue@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-08-13 11:37:02 +08:00
Dongjoon Hyun e8e5785f02 [SPARK-36502][SQL] Remove jaxb-api from sql/catalyst module
### What changes were proposed in this pull request?

This PR aims to remove `jaxb-api` usage from `sql/catalyst` module.

### Why are the changes needed?

We only use `DatatypeConverter.parseHexBinary` and `DatatypeConverter.printHexBinary` twice.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs.

Closes #33732 from dongjoon-hyun/SPARK-36502.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-08-13 12:31:09 +09:00
Gengliang Wang d4466d55ca [SPARK-36497][SQL] Support Interval add/subtract NULL
### What changes were proposed in this pull request?

Currently, `null + interval` will become `cast(cast(null as timestamp) + interval) as null`. This is a unexpected behavior and the result should not be of null type.
This weird behavior applies to `null - interval`, `interval + null`, `interval - null` as well.
To change it, I propose to cast the null as the same data type of the other element in the add/subtract:
```
null + interval => cast(null as interval) + interval
null - interval => cast(null as interval) - interval
interval + null=> interval + cast(null as interval)
interval - null => interval - cast(null as interval)
```

### Why are the changes needed?

Change the confusing behavior of `Interval +/- NULL` and `NULL +/- Interval`

### Does this PR introduce _any_ user-facing change?

No, the new interval type is not released yet.

### How was this patch tested?

Existing UT

Closes #33727 from gengliangwang/intervalTypeCoercion.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-08-13 11:10:32 +08:00
Tim Armstrong 886dbe01cd [SPARK-36501][ML] Fix random col names in LSHModel.approxSimilarityJoin
### What changes were proposed in this pull request?
Random.nextString() can include characters that are not valid in identifiers or likely to be buggy, e.g. non-printing characters, ".", "`". Instead use a utility that will always generate valid alphanumeric identifiers

### Why are the changes needed?
To deflake BucketedRandomProjectionLSHSuite and avoid similar failures that could be encountered by users.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Ran org.apache.spark.ml.feature.BucketedRandomProjectionLSHSuite

Closes #33730 from timarmstrong/flaky-lsb.

Authored-by: Tim Armstrong <tim.armstrong@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-08-13 12:04:42 +09:00
Min Shen b8e2186fe1 [SPARK-36483][CORE][TESTS] Fix intermittent test failures at Netty 4.1.52+
### What changes were proposed in this pull request?

Fix an intermittent test failure due to Netty dependency version bump.
Starting from Netty 4.1.52, its AbstractChannel will throw a new `StacklessClosedChannelException` for channel closed exception.
A hardcoded list of Strings to match for channel closed exception in `RPCIntegrationSuite` was not updated, thus leading to the intermittent test failure reported in #33613

### Why are the changes needed?

Fix intermittent test failure

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Closes #33713 from Victsm/SPARK-36378-followup.

Authored-by: Min Shen <mshen@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
2021-08-12 20:15:09 -05:00
Gengliang Wang 48e333af54 [SPARK-36445][SQL][FOLLOWUP] ANSI type coercion: revisit promoting string literals in datetime expressions
### What changes were proposed in this pull request?

1. Promote more string literal in subtractions. In the ANSI type coercion rule, we already promoted
```
string - timestamp => cast(string as timestamp) - timestamp
```
This PR is to promote the following string literals:
```
string - date => cast(string as date) - date
date - string => date - cast(date as string)
timestamp - string => timestamp
```
It is very straightforward to cast the string literal as the data type of the other side in the subtraction.

2. Merge the string promotion logic from the rule `StringLiteralCoercion`:
```
date_sub(date, string) => date_sub(date, cast(string as int))
date_add(date, string) => date_add(date, cast(string as int))
```

### Why are the changes needed?

1. Promote the string literal in the subtraction as the data type of the other side. This is straightforward and consistent with PostgreSQL
2. Certerize all the string literal promotion in the ANSI type coercion rule

### Does this PR introduce _any_ user-facing change?

No, the new ANSI type coercion rules are not released yet.

### How was this patch tested?

Existing UT

Closes #33724 from gengliangwang/datetimeTypeCoercion.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
2021-08-13 01:02:34 +08:00
Wenchen Fan 124d011ee7 [SPARK-35881][SQL][FOLLOWUP] Remove the AQE post stage creation extension
### What changes were proposed in this pull request?

This is a followup of #33140

It turns out that we may be able to complete the AQE and columnar execution integration without the AQE post stage creation extension. The rule `ApplyColumnarRulesAndInsertTransitions` can add to-columnar transition if the shuffle/broadcast supports columnar.

### Why are the changes needed?

remove APIs that are not needed.

### Does this PR introduce _any_ user-facing change?

No, the APIs are not released yet.

### How was this patch tested?

existing and manual tests

Closes #33701 from cloud-fan/aqe.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-08-12 21:35:28 +08:00
dgd-contributor 9b96b705b2 [SPARK-36097][CORE] Grouping exception in core/scheduler
### What changes were proposed in this pull request?
This PR group exception messages in core/src/main/scala/org/apache/spark/scheduler

### Why are the changes needed?
It will largely help with standardization of error messages and its maintenance.

### Does this PR introduce _any_ user-facing change?
No. Error messages remain unchanged.

### How was this patch tested?
No new tests - pass all original tests to make sure it doesn't break any existing behavior.

Closes #33529 from dgd-contributor/SPARK-36097.

Authored-by: dgd-contributor <dgd_contributor@viettel.com.vn>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-08-12 15:27:17 +08:00
IonutBoicuAms 2b665751d9 [SPARK-36489][SQL] Aggregate functions over no grouping keys, on tables with a single bucket, return multiple rows
### What changes were proposed in this pull request?

This PR fixes a bug in `DisableUnnecessaryBucketedScan`.
When running any aggregate function, without any grouping keys, on a table with a single bucket, multiple rows are returned.
This happens because the aggregate function satisfies the `AllTuples` distribution, no `Exchange` will be planned, and the bucketed scan will be disabled.

### Why are the changes needed?

Bug fixing. Aggregates over no grouping keys should return a single row.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added new test in `DisableUnnecessaryBucketedScanSuite`.

Closes #33711 from IonutBoicuAms/fix-bug-disableunnecessarybucketedscan.

Authored-by: IonutBoicuAms <ionut.boicu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-08-12 15:22:38 +08:00
Wenchen Fan 00a4364f38 [SPARK-36479][SQL][TEST] Improve datetime test coverage in SQL files
### What changes were proposed in this pull request?

This PR adds more datetime tests in `date.sql` and `timestamp.sql`, especially for string promotion.

### Why are the changes needed?

improve test coverage

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

N/A

Closes #33707 from cloud-fan/test.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-08-12 12:51:50 +08:00
Jungtaek Lim fac4e5eb3e [SPARK-36480][SS] SessionWindowStateStoreSaveExec should not filter input rows against watermark
### What changes were proposed in this pull request?

This PR proposes to remove the filter applying to input rows against watermark in SessionWindowStateStoreSaveExec, since SessionWindowStateStoreSaveExec is expected to store all inputs into state store, and apply eviction later.

### Why are the changes needed?

The code is logically not right, though I can't reproduce the actual problem.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests. I can't come up with broken case failing on previous code, but we can review the logic instead.

Closes #33708 from HeartSaVioR/SPARK-36480.

Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
2021-08-11 20:10:59 -07:00
Cheng Su 79515e4b6c [SPARK-32986][SQL] Add bucketed scan info in query plan of data source v1
### What changes were proposed in this pull request?

As a followup from discussion in https://github.com/apache/spark/pull/29804#discussion_r493229395 , currently the query plan for data source v1 scan operator - `FileSourceScanExec` has no information to indicate whether the table is read as bucketed table or not. And if table not read as bucketed table, what's the reason behind it. Add this info into `FileSourceScanExec` physical query plan output, can help users and developers understand query plan more easily without spending a lot of time debugging why table is not read as bucketed table.

### Why are the changes needed?

Help users and developers debug query plan for bucketed table.

### Does this PR introduce _any_ user-facing change?

The added `Bucketed` information in physical query plan when reading bucketed table.
Note for reading non-bucketed table, the query plan stays same and nothing is changed.

Example:

```
Seq((1, 2), (2, 3)).toDF("i", "j").write.bucketBy(8, "i").saveAsTable("t1")
Seq(2, 3).toDF("i").write.bucketBy(8, "i").saveAsTable("t2")
val df1 = spark.table("t1")
val df2 = spark.table("t2")
df1.join(df2, df1("i") === df2("i"))
```

```
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [i#20], [i#24], Inner
   :- Sort [i#20 ASC NULLS FIRST], false, 0
   :  +- Filter isnotnull(i#20)
   :     +- FileScan parquet default.t1[i#20,j#21] Batched: true, Bucketed: true, DataFilters: [isnotnull(i#20)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sq..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int,j:int>, SelectedBucketsCount: 8 out of 8
   +- Sort [i#24 ASC NULLS FIRST], false, 0
      +- Filter isnotnull(i#24)
         +- FileScan parquet default.t2[i#24] Batched: true, Bucketed: true, DataFilters: [isnotnull(i#24)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sq..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int>, SelectedBucketsCount: 8 out of 8
```

### How was this patch tested?

Added unit test in `ExplainSuite.scala`.

Closes #33698 from c21/scan-v1.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-08-11 19:41:36 -07:00
Liang-Chi Hsieh c250bbc70f [HOTFIX] Fix java style
### What changes were proposed in this pull request?

Fix a Java style issue blocking CI.

### Why are the changes needed?

Unblocking CI.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

CI

Closes #33715 from viirya/hotfix.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
2021-08-11 18:04:42 -07:00
Sean Owen 71fe0944e8 [SPARK-36481][ML] Expose LogisticRegression.setInitialModel, like KMeans et al do
### Why are the changes needed?

Several Spark ML components already allow setting of an initial model, including KMeans, LogisticRegression, and GaussianMixture. This is useful to begin training from a known reasonably good model.

However, the method in LogisticRegression is private to Spark. I don't see a good reason why it should be as the others in KMeans et al are not.

None of these are exposed in Pyspark, which I don't necessarily want to question or deal with now; there are other places one could arguably set an initial model too, but, here just interested in exposing the existing, tested functionality to callers.

### Does this PR introduce _any_ user-facing change?

Other than the new API method, no.

### How was this patch tested?

Existing tests

Closes #33710 from srowen/SPARK-36481.

Authored-by: Sean Owen <srowen@gmail.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
2021-08-11 23:20:49 +00:00
Sean Owen 3b0dd14f1c Update Spark key negotiation protocol 2021-08-11 18:04:55 -05:00
William Hyun aff1b5594a [SPARK-36482][BUILD] Bump orc to 1.6.10
### What changes were proposed in this pull request?
This PR aims to bump ORC to 1.6.10

### Why are the changes needed?
This will bring the latest bug fixes.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Pass the CIs.

Closes #33712 from williamhyun/orc.

Authored-by: William Hyun <william@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-08-11 11:32:06 -07:00
Hyukjin Kwon ccead315b3 [SPARK-36474][PYTHON][DOCS] Mention 'pandas API on Spark' in Spark overview pages
### What changes were proposed in this pull request?

This PR proposes to mention pandas API on Spark at Spark overview pages.

### Why are the changes needed?

To mention the new component.

### Does this PR introduce _any_ user-facing change?

Yes, it changes the documenation.

### How was this patch tested?

Manually tested by MD editor. For `docs/index.md`, I manually checked by building the docs by `SKIP_API=1 bundle exec jekyll serve --watch`.

Closes #33699 from HyukjinKwon/SPARK-36474.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-08-11 22:57:26 +09:00
Max Gekk bbf988bd73 [SPARK-36468][SQL][DOCS] Update docs about ANSI interval literals
### What changes were proposed in this pull request?
In the PR, I propose to update the doc page https://spark.apache.org/docs/latest/sql-ref-literals.html#interval-literal, and describe formats of ANSI interval literals.

<img width="1032" alt="Screenshot 2021-08-11 at 10 31 36" src="https://user-images.githubusercontent.com/1580697/128988454-7a6ac435-409b-4961-9b79-ebecfb141d5e.png">
<img width="1030" alt="Screenshot 2021-08-10 at 20 58 04" src="https://user-images.githubusercontent.com/1580697/128912018-a4ea3ee5-f252-49c7-a90e-5beaf7ac868f.png">

### Why are the changes needed?
To improve UX with Spark SQL, and inform users about recently added ANSI interval literals.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Manually checked the generated docs:
```
$ SKIP_API=1 SKIP_RDOC=1 SKIP_PYTHONDOC=1 SKIP_SCALADOC=1 bundle exec jekyll build
```

Closes #33693 from MaxGekk/doc-ansi-interval-literals.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-08-11 13:38:39 +03:00
yangjie01 f7c85b855b [SPARK-36456][CORE][SQL][SS] Clean up compilation warnings related to method closeQuietly in class IOUtils is deprecated
### What changes were proposed in this pull request?
There are some compilation warnings related to `method closeQuietly in class IOUtils is deprecated`,  `Apache commons-io` suggests that we should use the `try-with-resources` statement or handle suppressed exceptions manually.

The main change of this pr is replace  `o.a.commons.io.IOUtils.closeQuietly` with `o.a.s.network.util.JavaUtils.closeQuietly` directly because all original logic is suppressing `IOException`.

### Why are the changes needed?
Clean up compilation warnings related to `method closeQuietly in class IOUtils is deprecated`

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

- Pass the Jenkins or GitHub Action

Closes #33682 from LuciferYang/closeQuietly.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-08-10 23:17:57 -07:00
Gengliang Wang 3029e62a82 [SPARK-36445][SQL] ANSI type coercion rule for date time operations
### What changes were proposed in this pull request?

Implement a new rule for the date-time operations in the ANSI type coercion system:
1. Date will be converted to Timestamp when it is in the subtraction with Timestmap.
2. Promote string literals in date_add/date_sub/time_add

### Why are the changes needed?

Currently the type coercion rule `DateTimeOperations` doesn't match the design of the ANSI type coercion system:
1. For date_add/date_sub, if the input is timestamp type, Spark should not convert it into date type since date type is narrower than the timestamp type.
2. For date_add/date_sub/time_add, string value can be implicit cast to date/timestamp only when it is literal.

Thus, we need to have a new rule for the date-time operations in the ANSI type coercion system.

### Does this PR introduce _any_ user-facing change?

No, the ANSI type coercion rules are not releaesd.

### How was this patch tested?

New UT

Closes #33666 from gengliangwang/datetimeOp.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
2021-08-11 11:55:45 +08:00
attilapiros 1dced492fb [SPARK-36052][K8S] Introducing a limit for pending PODs
### What changes were proposed in this pull request?

Introducing a limit for pending PODs (newly created/requested executors included).
This limit is global for all the resource profiles. So first we have to count all the newly created and pending PODs (decreased by the ones which requested to be deleted) then we can share the remaining pending POD slots among the resource profiles.

### Why are the changes needed?

Without this PR dynamic allocation could request too many PODs and the K8S scheduler could be overloaded and scheduling of PODs will be affected by the load.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

With new unit tests.

Closes #33492 from attilapiros/SPARK-36052.

Authored-by: attilapiros <piros.attila.zsolt@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-08-10 20:16:21 -07:00
Jungtaek Lim ed60aaa9f1 [SPARK-36463][SS] Prohibit update mode in streaming aggregation with session window
### What changes were proposed in this pull request?

This PR proposes to prohibit update mode in streaming aggregation with session window.

UnsupportedOperationChecker will check and prohibit the case. As a side effect, this PR also simplifies the code as we can remove the implementation of iterator to support outputs of update mode.

This PR also cleans up test code via deduplicating.

### Why are the changes needed?

The semantic of "update" mode for session window based streaming aggregation is quite unclear.

For normal streaming aggregation, Spark will provide the outputs which can be "upsert"ed based on the grouping key. This is based on the fact grouping key won't be changed.

This doesn't hold true for session window based streaming aggregation, as session range is changing.

If end users leverage their knowledge about streaming aggregation, they will consider the key as grouping key + session (since they'll specify these things in groupBy), and it's high likely possible that existing row is not updated (overwritten) and ended up with having different rows.

If end users consider the key as grouping key, there's a small chance for end users to upsert the session correctly, though only the last updated session will be stored so it won't work with event time processing which there could be multiple active sessions.

### Does this PR introduce _any_ user-facing change?

No, as we haven't released this feature.

### How was this patch tested?

Updated tests.

Closes #33689 from HeartSaVioR/SPARK-36463.

Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
2021-08-11 10:45:52 +09:00
Min Shen 3f09093a21 [SPARK-36378][SHUFFLE] Switch to using RPCResponse to communicate common block push failures to the client
### What changes were proposed in this pull request?

We have run performance evaluations on the version of push-based shuffle committed to upstream so far, and have identified a few places for further improvements:
1. On the server side, we have noticed that the usage of `String.format`, especially when receiving a block push request, has a much higher overhead compared with string concatenation.
2. On the server side, the usage of `Throwables.getStackTraceAsString` in the `ErrorHandler.shouldRetryError` and `ErrorHandler.shouldLogError` has generated quite some overhead.

These 2 issues are related to how we are currently handling certain common block push failures.
We are communicating such failures via `RPCFailure` by transmitting the exception stack trace.
This generates the overhead on both server and client side for creating these exceptions and makes checking the type of failures fragile and inefficient with string matching of exception stack trace.
To address these, this PR also proposes to encode the common block push failure as an error code and send that back to the client with a proper RPC message.

### Why are the changes needed?

Improve shuffle service efficiency for push-based shuffle.
Improve code robustness for handling block push failures.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing unit tests.

Closes #33613 from Victsm/SPARK-36378.

Lead-authored-by: Min Shen <mshen@linkedin.com>
Co-authored-by: Min Shen <victor.nju@gmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
2021-08-10 16:46:55 -05:00
Kazuyuki Tanimura c888bad6a1 [SPARK-36464][CORE] Fix Underlying Size Variable Initialization in ChunkedByteBufferOutputStream for Writing Over 2GB Data
### What changes were proposed in this pull request?
The `size` method of `ChunkedByteBufferOutputStream` returns a `Long` value; however, the underlying `_size` variable is initialized as `Int`.
That causes an overflow and returns a negative size when over 2GB data is written into `ChunkedByteBufferOutputStream`

This PR proposes to change the underlying `_size` variable from `Int` to `Long` at the initialization

### Why are the changes needed?
Be cause the `size` method of `ChunkedByteBufferOutputStream` incorrectly returns a negative value when over 2GB data is written.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Passed existing tests
```
build/sbt "core/testOnly *ChunkedByteBufferOutputStreamSuite"
```
Also added a new unit test
```
build/sbt "core/testOnly *ChunkedByteBufferOutputStreamSuite – -z SPARK-36464"
```

Closes #33690 from kazuyukitanimura/SPARK-36464.

Authored-by: Kazuyuki Tanimura <ktanimura@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2021-08-10 10:29:54 -07:00
gengjiaan 186815be1c [SPARK-36429][SQL] JacksonParser should throw exception when data type unsupported
### What changes were proposed in this pull request?
Currently, when `set spark.sql.timestampType=TIMESTAMP_NTZ`, the behavior is different between `from_json` and `from_csv`.
```
-- !query
select from_json('{"t":"26/October/2015"}', 't Timestamp', map('timestampFormat', 'dd/MMMMM/yyyy'))
-- !query schema
struct<from_json({"t":"26/October/2015"}):struct<t:timestamp_ntz>>
-- !query output
{"t":null}
```

```
-- !query
select from_csv('26/October/2015', 't Timestamp', map('timestampFormat', 'dd/MMMMM/yyyy'))
-- !query schema
struct<>
-- !query output
java.lang.Exception
Unsupported type: timestamp_ntz
```

We should make `from_json` throws exception too.
This PR fix the discussion below
https://github.com/apache/spark/pull/33640#discussion_r682862523

### Why are the changes needed?
Make the behavior of `from_json` more reasonable.

### Does this PR introduce _any_ user-facing change?
'Yes'.
from_json throwing Exception when we set spark.sql.timestampType=TIMESTAMP_NTZ.

### How was this patch tested?
Tests updated.

Closes #33684 from beliefer/SPARK-36429-new.

Authored-by: gengjiaan <gengjiaan@360.cn>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-08-10 22:52:20 +08:00
Angerszhuuuu 89d8a4eacf [SPARK-36431][SQL] Support TypeCoercion of ANSI intervals with different fields
### What changes were proposed in this pull request?
 Support TypeCoercion of ANSI intervals with different fields

### Why are the changes needed?
 Support TypeCoercion of ANSI intervals with different fields

### Does this PR introduce _any_ user-facing change?
After this pr user can
 - use comparison function with  different fields of DayTimeIntervalType/YearMonthIntervalType such as `INTERVAL '1' YEAR` > `INTERVAL '11' MONTH`
 - support different field of ansi interval type in collection function such as `array(INTERVAL '1' YEAR, INTERVAL '11' MONTH)`
 - support different field of ansi interval type in `coalesce` etc..

### How was this patch tested?
Added UT

Closes #33661 from AngersZhuuuu/SPARK-SPARK-36431.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
2021-08-10 14:22:31 +03:00
Cheng Pan 7f56b73cad [SPARK-36466][SQL] Table in unloaded catalog referenced by view should load correctly
### What changes were proposed in this pull request?

Retain `spark.sql.catalog.*` confs when resolving view.

### Why are the changes needed?

Currently, if a view in default catalog ref a table in another catalog (e.g. jdbc), `org.apache.spark.sql.AnalysisException: Table or view not found: cat.t` will be thrown on accessing the view if the catalog has not been loaded yet.

### Does this PR introduce _any_ user-facing change?

Yes, bug fix.

### How was this patch tested?

Add UT.

Closes #33692 from pan3793/SPARK-36466.

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2021-08-10 17:31:21 +08:00