### What changes were proposed in this pull request?
This patch proposes to reuse KafkaSourceInitialOffsetWriter to remove identical code in KafkaSource.
Credit to jaceklaskowski for finding this.
https://lists.apache.org/thread.html/7faa6ac29d871444eaeccefc520e3543a77f4362af4bb0f12a3f7cb2%3Cdev.spark.apache.org%3E
### Why are the changes needed?
The code is duplicated with identical code, which opens the chance to maintain the code separately and might end up with bugs not addressed one side.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Existing UTs, as it's simple refactor.
Closes#25583 from HeartSaVioR/MINOR-SS-reuse-KafkaSourceInitialOffsetWriter.
Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
### What changes were proposed in this pull request?
When Task retry happens with Kafka source then it's not known whether the consumer is the issue so the old consumer removed from cache and new consumer created. The feature works fine but not covered with tests.
In this PR I've added such test for DStreams + Structured Streaming.
### Why are the changes needed?
No such tests are there.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Existing + new unit tests.
Closes#25582 from gaborgsomogyi/SPARK-28875.
Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
### What changes were proposed in this pull request?
i broke run-tests.py for non-PRB builds in this PR:
https://github.com/apache/spark/pull/25423
### Why are the changes needed?
to fix what i broke
### Does this PR introduce any user-facing change?
no
### How was this patch tested?
the build system will test this
Closes#25585 from shaneknapp/fix-run-tests.
Authored-by: shane knapp <incomplete@gmail.com>
Signed-off-by: shane knapp <incomplete@gmail.com>
## What changes were proposed in this pull request?
This fixes issues that can arise when the jars for different hadoop versions mix, and short-circuits the case where we are running with a spark that was not built for yarn 3 (resource support).
## How was this patch tested?
I tested it manually.
Closes#25403 from abellina/SPARK-28679.
Authored-by: Alessandro Bellina <abellina@nvidia.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?
The shuffle writer API introduced in SPARK-28209 has a flaw that leads to a memory usage regression - we ended up tracking the partition lengths in two places. Here, we modify the API slightly to avoid redundant tracking. The implementation of the shuffle writer plugin is now responsible for tracking the lengths of partitions, and propagating this back up to the higher shuffle writer as part of the commitAllPartitions API.
## How was this patch tested?
Existing unit tests.
Closes#25341 from mccheah/dont-redundantly-store-part-lengths.
Authored-by: mcheah <mcheah@palantir.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?
we need to add the ability to test PRBs against java11.
see comments here: https://github.com/apache/spark/pull/25405
## How was this patch tested?
the build system will test this.
Closes#25423 from shaneknapp/spark-prb-java11.
Authored-by: shane knapp <incomplete@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
In my application spark streaming is restarted programmatically by stopping StreamingContext without stopping of SparkContext and creating/starting a new one. I use it for automatic detection of Kafka topic/partition changes and automatic failover in case of non fatal exceptions.
However i notice that after multiple restarts driver fails with OOM. During investigation of heap dump i figured out that StreamingContext object isn't cleared by GC after stopping.
<img width="1901" alt="Screen Shot 2019-08-14 at 12 23 33" src="https://user-images.githubusercontent.com/13151161/63010149-83f4c200-be8e-11e9-9f48-12b6e97839f4.png">
There are several places which holds reference to it :
1. StreamingTab registers StreamingJobProgressListener which holds reference to Streaming Context directly to LiveListenerBus shared queue via ssc.sc.addSparkListener(listener) method invocation. However this listener isn't unregistered at stop method.
2. json handlers (/streaming/json and /streaming/batch/json) aren't unregistered in SparkUI, while they hold reference to StreamingJobProgressListener. Basically the same issue affects all the pages, i assume that renderJsonHandler should be added to pageToHandlers cache on attachPage method invocation in order to unregistered it as well on detachPage.
3. SparkUi holds reference to StreamingJobProgressListener in the corresponding local variable which isn't cleared after stopping of StreamingContext.
## How was this patch tested?
Added tests to existing test suites.
After i applied these changes via reflection in my app OOM on driver side gone.
Closes#25439 from choojoyq/SPARK-28709-fix-streaming-context-leak-on-stop.
Authored-by: Nikita Gorbachevsky <nikitag@playtika.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
This PR aims at improving the way physical plans are explained in spark.
Currently, the explain output for physical plan may look very cluttered and each operator's
string representation can be very wide and wraps around in the display making it little
hard to follow. This especially happens when explaining a query 1) Operating on wide tables
2) Has complex expressions etc.
This PR attempts to split the output into two sections. In the header section, we display
the basic operator tree with a number associated with each operator. In this section, we strictly
control what we output for each operator. In the footer section, each operator is verbosely
displayed. Based on the feedback from Maryann, the uncorrelated subqueries (SubqueryExecs) are not included in the main plan. They are printed separately after the main plan and can be
correlated by the originating expression id from its parent plan.
To illustrate, here is a simple plan displayed in old vs new way.
Example query1 :
```
EXPLAIN SELECT key, Max(val) FROM explain_temp1 WHERE key > 0 GROUP BY key HAVING max(val) > 0
```
Old :
```
*(2) Project [key#2, max(val)#15]
+- *(2) Filter (isnotnull(max(val#3)#18) AND (max(val#3)#18 > 0))
+- *(2) HashAggregate(keys=[key#2], functions=[max(val#3)], output=[key#2, max(val)#15, max(val#3)#18])
+- Exchange hashpartitioning(key#2, 200)
+- *(1) HashAggregate(keys=[key#2], functions=[partial_max(val#3)], output=[key#2, max#21])
+- *(1) Project [key#2, val#3]
+- *(1) Filter (isnotnull(key#2) AND (key#2 > 0))
+- *(1) FileScan parquet default.explain_temp1[key#2,val#3] Batched: true, DataFilters: [isnotnull(key#2), (key#2 > 0)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/explain_temp1], PartitionFilters: [], PushedFilters: [IsNotNull(key), GreaterThan(key,0)], ReadSchema: struct<key:int,val:int>
```
New :
```
Project (8)
+- Filter (7)
+- HashAggregate (6)
+- Exchange (5)
+- HashAggregate (4)
+- Project (3)
+- Filter (2)
+- Scan parquet default.explain_temp1 (1)
(1) Scan parquet default.explain_temp1 [codegen id : 1]
Output: [key#2, val#3]
(2) Filter [codegen id : 1]
Input : [key#2, val#3]
Condition : (isnotnull(key#2) AND (key#2 > 0))
(3) Project [codegen id : 1]
Output : [key#2, val#3]
Input : [key#2, val#3]
(4) HashAggregate [codegen id : 1]
Input: [key#2, val#3]
(5) Exchange
Input: [key#2, max#11]
(6) HashAggregate [codegen id : 2]
Input: [key#2, max#11]
(7) Filter [codegen id : 2]
Input : [key#2, max(val)#5, max(val#3)#8]
Condition : (isnotnull(max(val#3)#8) AND (max(val#3)#8 > 0))
(8) Project [codegen id : 2]
Output : [key#2, max(val)#5]
Input : [key#2, max(val)#5, max(val#3)#8]
```
Example Query2 (subquery):
```
SELECT * FROM explain_temp1 WHERE KEY = (SELECT Max(KEY) FROM explain_temp2 WHERE KEY = (SELECT Max(KEY) FROM explain_temp3 WHERE val > 0) AND val = 2) AND val > 3
```
Old:
```
*(1) Project [key#2, val#3]
+- *(1) Filter (((isnotnull(KEY#2) AND isnotnull(val#3)) AND (KEY#2 = Subquery scalar-subquery#39)) AND (val#3 > 3))
: +- Subquery scalar-subquery#39
: +- *(2) HashAggregate(keys=[], functions=[max(KEY#26)], output=[max(KEY)#45])
: +- Exchange SinglePartition
: +- *(1) HashAggregate(keys=[], functions=[partial_max(KEY#26)], output=[max#47])
: +- *(1) Project [key#26]
: +- *(1) Filter (((isnotnull(KEY#26) AND isnotnull(val#27)) AND (KEY#26 = Subquery scalar-subquery#38)) AND (val#27 = 2))
: : +- Subquery scalar-subquery#38
: : +- *(2) HashAggregate(keys=[], functions=[max(KEY#28)], output=[max(KEY)#43])
: : +- Exchange SinglePartition
: : +- *(1) HashAggregate(keys=[], functions=[partial_max(KEY#28)], output=[max#49])
: : +- *(1) Project [key#28]
: : +- *(1) Filter (isnotnull(val#29) AND (val#29 > 0))
: : +- *(1) FileScan parquet default.explain_temp3[key#28,val#29] Batched: true, DataFilters: [isnotnull(val#29), (val#29 > 0)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/explain_temp3], PartitionFilters: [], PushedFilters: [IsNotNull(val), GreaterThan(val,0)], ReadSchema: struct<key:int,val:int>
: +- *(1) FileScan parquet default.explain_temp2[key#26,val#27] Batched: true, DataFilters: [isnotnull(key#26), isnotnull(val#27), (val#27 = 2)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/explain_temp2], PartitionFilters: [], PushedFilters: [IsNotNull(key), IsNotNull(val), EqualTo(val,2)], ReadSchema: struct<key:int,val:int>
+- *(1) FileScan parquet default.explain_temp1[key#2,val#3] Batched: true, DataFilters: [isnotnull(key#2), isnotnull(val#3), (val#3 > 3)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/explain_temp1], PartitionFilters: [], PushedFilters: [IsNotNull(key), IsNotNull(val), GreaterThan(val,3)], ReadSchema: struct<key:int,val:int>
```
New:
```
Project (3)
+- Filter (2)
+- Scan parquet default.explain_temp1 (1)
(1) Scan parquet default.explain_temp1 [codegen id : 1]
Output: [key#2, val#3]
(2) Filter [codegen id : 1]
Input : [key#2, val#3]
Condition : (((isnotnull(KEY#2) AND isnotnull(val#3)) AND (KEY#2 = Subquery scalar-subquery#23)) AND (val#3 > 3))
(3) Project [codegen id : 1]
Output : [key#2, val#3]
Input : [key#2, val#3]
===== Subqueries =====
Subquery:1 Hosting operator id = 2 Hosting Expression = Subquery scalar-subquery#23
HashAggregate (9)
+- Exchange (8)
+- HashAggregate (7)
+- Project (6)
+- Filter (5)
+- Scan parquet default.explain_temp2 (4)
(4) Scan parquet default.explain_temp2 [codegen id : 1]
Output: [key#26, val#27]
(5) Filter [codegen id : 1]
Input : [key#26, val#27]
Condition : (((isnotnull(KEY#26) AND isnotnull(val#27)) AND (KEY#26 = Subquery scalar-subquery#22)) AND (val#27 = 2))
(6) Project [codegen id : 1]
Output : [key#26]
Input : [key#26, val#27]
(7) HashAggregate [codegen id : 1]
Input: [key#26]
(8) Exchange
Input: [max#35]
(9) HashAggregate [codegen id : 2]
Input: [max#35]
Subquery:2 Hosting operator id = 5 Hosting Expression = Subquery scalar-subquery#22
HashAggregate (15)
+- Exchange (14)
+- HashAggregate (13)
+- Project (12)
+- Filter (11)
+- Scan parquet default.explain_temp3 (10)
(10) Scan parquet default.explain_temp3 [codegen id : 1]
Output: [key#28, val#29]
(11) Filter [codegen id : 1]
Input : [key#28, val#29]
Condition : (isnotnull(val#29) AND (val#29 > 0))
(12) Project [codegen id : 1]
Output : [key#28]
Input : [key#28, val#29]
(13) HashAggregate [codegen id : 1]
Input: [key#28]
(14) Exchange
Input: [max#37]
(15) HashAggregate [codegen id : 2]
Input: [max#37]
```
Note:
I opened this PR as a WIP to start getting feedback. I will be on vacation starting tomorrow
would not be able to immediately incorporate the feedback. I will start to
work on them as soon as i can. Also, currently this PR provides a basic infrastructure
for explain enhancement. The details about individual operators will be implemented
in follow-up prs
## How was this patch tested?
Added a new test `explain.sql` that tests basic scenarios. Need to add more tests.
Closes#24759 from dilipbiswal/explain_feature.
Authored-by: Dilip Biswal <dbiswal@us.ibm.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Test `spark.sql.redaction.options.regex` with and without default values.
### Why are the changes needed?
Normally, we do not rely on the default value of `spark.sql.redaction.options.regex`.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
N/A
Closes#25579 from wangyum/SPARK-28642-f1.
Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Xiao Li <gatorsmile@gmail.com>
### What changes were proposed in this pull request?
This PR implements `SparkGetCatalogsOperation` for Thrift Server metadata completeness.
### Why are the changes needed?
Thrift Server metadata completeness.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Unit test
Closes#25555 from wangyum/SPARK-28852.
Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Xiao Li <gatorsmile@gmail.com>
### What changes were proposed in this pull request?
1. Fix the physical plan (`DescribeTableExec`) to have the same output attributes as the corresponding logical plan.
2. Remove `output` in statements since they are unresolved plans.
### Why are the changes needed?
Correctness of how output attributes should work.
### Does this PR introduce any user-facing change?
NO
### How was this patch tested?
Existing tests
Closes#25568 from imback82/describe_table.
Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This PR aims to specify Jekyll Version explicitly in our release docker image.
### Why are the changes needed?
Recently, Jekyll 4.0 is released and it dropped Ruby 2.3 support.
This breaks our release docker image build.
```
Building native extensions. This could take a while...
ERROR: Error installing jekyll:
jekyll-sass-converter requires Ruby version >= 2.4.0.
```
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
The following should succeed.
```
$ docker build -t spark-rm:test --build-arg UID=501 dev/create-release/spark-rm
...
Successfully tagged spark-rm:test
```
Closes#25578 from dongjoon-hyun/SPARK-28868.
Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
Resolves [SPARK-28778: Shuffle jobs fail due to incorrect advertised address when running in a virtual network on Mesos](https://issues.apache.org/jira/browse/SPARK-28778).
This patch fixes a bug which occurs when shuffle jobs are launched by Mesos in a virtual network. Mesos scheduler sets executor `--hostname` parameter to `0.0.0.0` in the case when `spark.mesos.network.name` is provided. This makes executors use `0.0.0.0` as their advertised address and, in the presence of shuffle, executors fail to fetch shuffle blocks from each other using `0.0.0.0` as the origin. When a virtual network is used the hostname or IP address is not known upfront and assigned to a container at its start time so the executor process needs to advertise the correct dynamically assigned address to be reachable by other executors.
Changes:
- added a fallback to `Utils.localHostName()` in Spark Executors when `--hostname` is not provided
- removed setting executor address to `0.0.0.0` from Mesos scheduler
- refactored the code related to building executor command in Mesos scheduler
- added network configuration support to Docker containerizer
- added unit tests
### Why are the changes needed?
The bug described above prevents Mesos users from running any jobs which involve shuffle due to the inability of executors to fetch shuffle blocks because of incorrect advertised address when virtual network is used.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
- added unit test to `MesosCoarseGrainedSchedulerBackendSuite` which verifies the absence of `--hostname` parameter when `spark.mesos.network.name` is provided and its presence otherwise
- added unit test to `MesosSchedulerBackendUtilSuite` which verifies that `MesosSchedulerBackendUtil.buildContainerInfo` sets network-related properties for Docker containerizer
- unit tests from this repo launched with profiles: `./build/mvn test -Pmesos -Pnetlib-lgpl -Psparkr -Phive -Phive-thriftserver`, build log attached: [mvn.test.log](https://github.com/apache/spark/files/3516891/mvn.test.log)
- integration tests from [DCOS Spark repo](https://github.com/mesosphere/spark-build), more specifically - [test_spark_cni.py](https://github.com/mesosphere/spark-build/blob/master/tests/test_spark_cni.py) which runs a specific [shuffle job](https://github.com/mesosphere/spark-build/blob/master/tests/jobs/scala/src/main/scala/ShuffleApp.scala) and verifies its successful completion, Mesos task network configuration, and IP addresses for both Mesos and Docker containerizers
Closes#25500 from akirillov/DCOS-45840-fix-advertised-ip-in-virtual-networks.
Authored-by: Anton Kirillov <akirillov@mesosophere.io>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
expose the newly added tree-based transformation in the py side
### Why are the changes needed?
function parity
### Does this PR introduce any user-facing change?
yes, add `setLeafCol` & `getLeafCol` in the py side
### How was this patch tested?
added tests & local tests
Closes#25566 from zhengruifeng/py_tree_path.
Authored-by: zhengruifeng <ruifengz@foxmail.com>
Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
This reverts commit 485ae6d181.
Closes#25563 from gatorsmile/revert.
Authored-by: Xiao Li <gatorsmile@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
To follow ANSI SQL, we should support a configurable mode that throws exceptions when casting to integers causes overflow.
The behavior is similar to https://issues.apache.org/jira/browse/SPARK-26218, which throws exceptions on arithmetical operation overflow.
To unify it, the configuration is renamed from "spark.sql.arithmeticOperations.failOnOverFlow" to "spark.sql.failOnIntegerOverFlow"
## How was this patch tested?
Unit test
Closes#25461 from gengliangwang/AnsiCastIntegral.
Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
Add id to Exchange and Subquery's stringArgs method for easier identifying their reuses in query plans, for example:
```
ReusedExchange d_date_sk#827, BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))) [id=#2710]
```
Where `2710` is the id of the reused exchange.
## How was this patch tested?
Passes existing tests
Closes#25434 from dbaliafroozeh/ImplementStringArgsExchangeSubqueryExec.
Authored-by: Ali Afroozeh <ali.afroozeh@databricks.com>
Signed-off-by: herman <herman@databricks.com>
### What changes were proposed in this pull request?
This PR removes the `canonicalize(attrs: AttributeSeq)` from `PlanExpression` and taking care of normalizing expressions in `QueryPlan`.
### Why are the changes needed?
`Expression` has already a `canonicalized` method and having the `canonicalize` method in `PlanExpression` is confusing.
### Does this PR introduce any user-facing change?
Removes the `canonicalize` plan from `PlanExpression`. Also renames the `normalizeExprId` to `normalizeExpressions` in query plan.
### How was this patch tested?
This PR is a refactoring and passes the existing tests
Closes#25534 from dbaliafroozeh/ImproveCanonicalizeAPI.
Authored-by: Ali Afroozeh <ali.afroozeh@databricks.com>
Signed-off-by: herman <herman@databricks.com>
### What changes were proposed in this pull request?
This PR aims to clean up the commit logs by removing the comments of our PR template.
### Why are the changes needed?
Apache Spark PR template has comments. Sometime we forget to clean up them because GitHub hides them nicely. It would be great if we clean up this. Otherwise, this makes the commit logs too verbose. (There are a few commits already.)
### Does this PR introduce any user-facing change?
No. (only for committers)
### How was this patch tested?
Manually with Python2/Python3.
Closes#25564 from dongjoon-hyun/SPARK-28857.
Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
Implements the SHOW TABLES logical and physical plans for data source v2 tables.
## How was this patch tested?
Added unit tests to `DataSourceV2SQLSuite`.
Closes#25247 from imback82/dsv2_show_tables.
Lead-authored-by: terryk <yuminkim@gmail.com>
Co-authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This PR extracts the schema information of TPCDS tables into a separate class called `TPCDSSchema` which can be reused for other testing purposes
### How was this patch tested?
This PR is only a refactoring for tests and passes existing tests
Closes#25535 from dbaliafroozeh/IntroduceTPCDSSchema.
Authored-by: Ali Afroozeh <ali.afroozeh@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
This PR fixes the leak of crc files from CheckpointFileManager when FileContextBasedCheckpointFileManager is being used.
Spark hits the Hadoop bug, [HADOOP-16255](https://issues.apache.org/jira/browse/HADOOP-16255) which seems to be a long-standing issue.
This is there're two `renameInternal` methods:
```
public void renameInternal(Path src, Path dst)
public void renameInternal(final Path src, final Path dst, boolean overwrite)
```
which should be overridden to handle all cases but ChecksumFs only overrides method with 2 params, so when latter is called FilterFs.renameInternal(...) is called instead, and it will do rename with RawLocalFs as underlying filesystem.
The bug is related to FileContext, so FileSystemBasedCheckpointFileManager is not affected.
[SPARK-17475](https://issues.apache.org/jira/browse/SPARK-17475) took a workaround for this bug, but [SPARK-23966](https://issues.apache.org/jira/browse/SPARK-23966) seemed to bring regression.
This PR deletes crc file as "best-effort" when renaming, as failing to delete crc file is not that critical to fail the task.
### Why are the changes needed?
This PR prevents crc files not being cleaned up even purging batches. Too many files in same directory often hurts performance, as well as each crc file occupies more space than its own size so possible to occupy nontrivial amount of space when batches go up to 100000+.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Some unit tests are modified to check leakage of crc files.
Closes#25488 from HeartSaVioR/SPARK-28025.
Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
## What changes were proposed in this pull request?
After all the discussions in the dev list: http://apache-spark-developers-list.1001551.n3.nabble.com/Discuss-Follow-ANSI-SQL-on-table-insertion-td27531.html#a27562.
Here I propose that we can make the store assignment rules in the analyzer configurable, and the behavior of V1 and V2 should be consistent.
When inserting a value into a column with a different data type, Spark will perform type coercion. After this PR, we support 2 policies for the type coercion rules:
legacy and strict.
1. With legacy policy, Spark allows casting any value to any data type. The legacy policy is the only behavior in Spark 2.x and it is compatible with Hive.
2. With strict policy, Spark doesn't allow any possible precision loss or data truncation in type coercion, e.g. `int` and `long`, `float` -> `double` are not allowed.
Eventually, the "legacy" mode will be removed, so it is disallowed in data source V2.
To ensure backward compatibility with existing queries, the default store assignment policy for data source V1 is "legacy".
## How was this patch tested?
Unit test
Closes#25453 from gengliangwang/tableInsertRule.
Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Added proper message instead of NPE for invalid Dataset operations (e.g. calling actions inside of transformations) similar to SPARK-5063 for RDD
### Why are the changes needed?
To report the user about the exact issue instead of NPE
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Manually tested
```scala
test code snap
"import spark.implicits._
val ds1 = spark.sparkContext.parallelize(1 to 100, 100).toDS()
val ds2 = spark.sparkContext.parallelize(1 to 100, 100).toDS()
ds1.map(x => {
// scalastyle:off
println(ds2.count + x)
x
}).collect()"
```
Closes#25503 from shivusondur/jira28702.
Authored-by: shivusondur <shivusondur@gmail.com>
Signed-off-by: Josh Rosen <rosenville@gmail.com>
### What changes were proposed in this pull request?
Improved warning message in Barrier Execution Mode when required slots > maximum slots.
The new message contains information about required slots, maximum slots and how many times retry failed.
### Why are the changes needed?
Providing to users with the number of required slots, maximum slots and how many times retry failed might help users to decide what they should do.
For example, continuing to wait for retry succeeded or killing jobs.
### Does this PR introduce any user-facing change?
Yes.
If `spark.scheduler.barrier.maxConcurrentTaskCheck.maxFailures=3`, we get following warning message.
Before applying this change:
```
19/08/18 15:18:09 WARN DAGScheduler: The job 2 requires to run a barrier stage that requires more slots than the total number of slots in the cluster currently.
19/08/18 15:18:24 WARN DAGScheduler: The job 2 requires to run a barrier stage that requires more slots than the total number of slots in the cluster currently.
19/08/18 15:18:39 WARN DAGScheduler: The job 2 requires to run a barrier stage that requires more slots than the total number of slots in the cluster currently.
19/08/18 15:18:54 WARN DAGScheduler: The job 2 requires to run a barrier stage that requires more slots than the total number of slots in the cluster currently.
org.apache.spark.scheduler.BarrierJobSlotsNumberCheckFailed: [SPARK-24819]: Barrier execution mode does not allow run a barrier stage that requires more slots than the total number of slots in the cluster currently. Please init a new cluster with more CPU cores or repartition the input RDD(s) to reduce the number of slots required to run this barrier stage.
at org.apache.spark.scheduler.DAGScheduler.checkBarrierStageWithNumSlots(DAGScheduler.scala:439)
at org.apache.spark.scheduler.DAGScheduler.createResultStage(DAGScheduler.scala:453)
at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:983)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2140)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2132)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2121)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:749)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2080)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2145)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:961)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:366)
at org.apache.spark.rdd.RDD.collect(RDD.scala:960)
... 47 elided
```
After applying this change:
```
19/08/18 16:52:23 WARN DAGScheduler: The job 0 requires to run a barrier stage that requires 3 slots than the total number of slots(2) in the cluster currently.
19/08/18 16:52:38 WARN DAGScheduler: The job 0 requires to run a barrier stage that requires 3 slots than the total number of slots(2) in the cluster currently (Retry 1/3 failed).
19/08/18 16:52:53 WARN DAGScheduler: The job 0 requires to run a barrier stage that requires 3 slots than the total number of slots(2) in the cluster currently (Retry 2/3 failed).
19/08/18 16:53:08 WARN DAGScheduler: The job 0 requires to run a barrier stage that requires 3 slots than the total number of slots(2) in the cluster currently (Retry 3/3 failed).
org.apache.spark.scheduler.BarrierJobSlotsNumberCheckFailed: [SPARK-24819]: Barrier execution mode does not allow run a barrier stage that requires more slots than the total number of slots in the cluster currently. Please init a new cluster with more CPU cores or repartition the input RDD(s) to reduce the number of slots required to run this barrier stage.
at org.apache.spark.scheduler.DAGScheduler.checkBarrierStageWithNumSlots(DAGScheduler.scala:439)
at org.apache.spark.scheduler.DAGScheduler.createResultStage(DAGScheduler.scala:453)
at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:983)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2140)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2132)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2121)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:749)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2080)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2145)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:961)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:366)
at org.apache.spark.rdd.RDD.collect(RDD.scala:960)
... 47 elided
```
### How was this patch tested?
I tested manually using Spark Shell with following configuration and script. And then, checked log message.
```
$ bin/spark-shell --master local[2] --conf spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures=3
scala> sc.parallelize(1 to 100, sc.defaultParallelism+1).barrier.mapPartitions(identity(_)).collect
```
Closes#25487 from sarutak/barrier-exec-mode-warning-message.
Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
Tree-based feature transformation is a widely used feature and already implemented in many famous libraries, like sklearn/xgboost/lightgbm/catboost. But is still missing in ML.
The previous discussions and design doc can be found in [SPARK-13677](https://issues.apache.org/jira/browse/SPARK-13677), which is the only left subtask in 'GBT improvement umbrella' [SPARK-14047](https://issues.apache.org/jira/browse/SPARK-14047).
This pr is to add tree-based feature transformation.
## How was this patch tested?
existing and added suites
Closes#25383 from zhengruifeng/tree_path.
Authored-by: zhengruifeng <ruifengz@foxmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
<!--
Thanks for sending a pull request! Here are some tips for you:
1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
4. Be sure to keep the PR description updated to reflect all changes.
5. Please write your PR title to summarize what this PR proposes.
6. If possible, provide a concise example to reproduce the issue for a faster review.
-->
### What changes were proposed in this pull request?
SparkML writer gets hadoop conf from session state, instead of the spark context.
<!--
Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue.
If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
2. If you fix some SQL features, you can provide some references of other DBMSes.
3. If there is design documentation, please add the link.
4. If there is a discussion in the mailing list, please add the link.
-->
### Why are the changes needed?
Allow for multiple sessions in the same context that have different hadoop configurations.
<!--
Please clarify why the changes are needed. For instance,
1. If you propose a new API, clarify the use case for a new API.
2. If you fix a bug, you can clarify why it is a bug.
-->
### Does this PR introduce any user-facing change?
<!--
If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
If no, write 'No'.
-->
No
### How was this patch tested?
<!--
If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
If tests were not added, please describe why they were not added and/or why it was difficult to add.
-->
Tested in pyspark.ml.tests.test_persistence.PersistenceTest test_default_read_write
Closes#25505 from helenyugithub/SPARK-28776.
Authored-by: heleny <heleny@palantir.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
### What changes were proposed in this pull request?
This PR aims to annotate `HiveExternalCatalogVersionsSuite` with `ExtendedHiveTest`.
### Why are the changes needed?
`HiveExternalCatalogVersionsSuite` is an outstanding test in terms of testing time. This PR aims to allow skipping this test suite when we use `ExtendedHiveTest`.
![time](https://user-images.githubusercontent.com/9700541/63489184-4c75af00-c466-11e9-9e12-d250d4a23292.png)
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Since Jenkins doesn't exclude `ExtendedHiveTest`, there is no difference in Jenkins testing.
This PR should be tested by manually by the following.
**BEFORE**
```
$ cd sql/hive
$ mvn package -Dtest=none -DwildcardSuites=org.apache.spark.sql.hive.HiveExternalCatalogVersionsSuite -Dtest.exclude.tags=org.apache.spark.tags.ExtendedHiveTest
...
Run starting. Expected test count is: 1
HiveExternalCatalogVersionsSuite:
22:32:16.218 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load ...
```
**AFTER**
```
$ cd sql/hive
$ mvn package -Dtest=none -DwildcardSuites=org.apache.spark.sql.hive.HiveExternalCatalogVersionsSuite -Dtest.exclude.tags=org.apache.spark.tags.ExtendedHiveTest
...
Run starting. Expected test count is: 0
HiveExternalCatalogVersionsSuite:
Run completed in 772 milliseconds.
Total number of tests run: 0
Suites: completed 2, aborted 0
Tests: succeeded 0, failed 0, canceled 0, ignored 0, pending 0
No tests were executed.
...
```
Closes#25550 from dongjoon-hyun/SPARK-28847.
Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
Fix minor typo in SQLConf.
`FILE_COMRESSION_FACTOR` -> `FILE_COMPRESSION_FACTOR`
### Why are the changes needed?
Make conf more understandable.
### Does this PR introduce any user-facing change?
No. (`spark.sql.sources.fileCompressionFactor` is unchanged.)
### How was this patch tested?
Pass the Jenkins with the existing tests.
Closes#25538 from triplesheep/TYPO-FIX.
Authored-by: triplesheep <triplesheep0419@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
Spark uses Netty 4 directly, but also includes Netty 3 only because transitive dependencies do. The dependencies (Hadoop HDFS, Zookeeper, Avro) don't seem to need this dependency as used in Spark. I think we can forcibly remove it to slim down the dependencies.
Previous attempts were blocked by its usage in Flume, but that dependency has gone away.
https://github.com/apache/spark/pull/15436
### Why are the changes needed?
Mostly to reduce the transitive dependency size and complexity a little bit and avoid triggering spurious security alerts on Netty 3.x usage.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Existing tests
Closes#25544 from srowen/SPARK-17875.
Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
This PR adds a simple cost model and a mechanism to compare the costs of the before and after plans of each re-optimization in Adaptive Query Execution. Now the workflow of AQE re-optimization is changed to: If the cost of the plan after re-optimization is lower than or equal to that of the plan before re-optimization and the plan has been changed after re-optimization (if equal), the current physical plan will be updated to the plan after re-optimization, otherwise it will remain unchanged until the next re-optimization.
### Why are the changes needed?
This new mechanism is to prevent regressions in Adaptive Query Execution caused by change of the plan introducing extra cost, in this PR specifically, change of SMJ to BHJ leading to extra `ShuffleExchangeExec`s.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Added UT.
Closes#25456 from maryannxue/aqe-cost.
Authored-by: maryannxue <maryannxue@apache.org>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
<!--
Thanks for sending a pull request! Here are some tips for you:
1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
4. Be sure to keep the PR description updated to reflect all changes.
5. Please write your PR title to summarize what this PR proposes.
6. If possible, provide a concise example to reproduce the issue for a faster review.
-->
### What changes were proposed in this pull request?
<!--
Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue.
If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
2. If you fix some SQL features, you can provide some references of other DBMSes.
3. If there is design documentation, please add the link.
4. If there is a discussion in the mailing list, please add the link.
-->
When running CTAS/RTAS, use the nullable schema of the input query to create the table.
### Why are the changes needed?
<!--
Please clarify why the changes are needed. For instance,
1. If you propose a new API, clarify the use case for a new API.
2. If you fix a bug, you can clarify why it is a bug.
-->
It's very likely to run CTAS/RTAS with non-nullable input query, e.g. `CREATE TABLE t AS SELECT 1`. However, it's surprising to users if they can't write null to this table later. Non-nullable is kind of a constraint of the column and should be specified by users explicitly.
For reference, Postgres also use nullable schema for CTAS:
```
> create table t1(i int not null);
> insert into t1 values (1);
> create table t2 as select i from t1;
> \d+ t1;
Column | Type | Collation | Nullable | Default | Storage | Stats target | Description
--------+---------+-----------+----------+---------+---------+--------------+-------------
i | integer | | not null | | plain | |
> \d+ t2;
Column | Type | Collation | Nullable | Default | Storage | Stats target | Description
--------+---------+-----------+----------+---------+---------+--------------+-------------
i | integer | | | | plain | |
```
File source V1 has the same behavior.
### Does this PR introduce any user-facing change?
<!--
If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
If no, write 'No'.
-->
Yes, after this PR CTAS/RTAS creates tables with nullable schema, then users can insert null values later.
### How was this patch tested?
<!--
If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
If tests were not added, please describe why they were not added and/or why it was difficult to add.
-->
new test
Closes#25536 from cloud-fan/ctas.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
<!--
Thanks for sending a pull request! Here are some tips for you:
1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
4. Be sure to keep the PR description updated to reflect all changes.
5. Please write your PR title to summarize what this PR proposes.
6. If possible, provide a concise example to reproduce the issue for a faster review.
-->
### What changes were proposed in this pull request?
<!--
Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue.
If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
2. If you fix some SQL features, you can provide some references of other DBMSes.
3. If there is design documentation, please add the link.
4. If there is a discussion in the mailing list, please add the link.
-->
The current namespace/catalog should be set to None at the beginning, so that we can read the new configs when reporting currennt namespace/catalog later.
### Why are the changes needed?
<!--
Please clarify why the changes are needed. For instance,
1. If you propose a new API, clarify the use case for a new API.
2. If you fix a bug, you can clarify why it is a bug.
-->
Fix a bug in CatalogManager, to reflect the change of default catalog config when reporting current catalog.
### Does this PR introduce any user-facing change?
<!--
If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
If no, write 'No'.
-->
No. The current namespace/catalog stuff is still internal right now.
### How was this patch tested?
<!--
If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
If tests were not added, please describe why they were not added and/or why it was difficult to add.
-->
a new test suite
Closes#25521 from cloud-fan/fix.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Burak Yavuz <brkyvz@gmail.com>
## What changes were proposed in this pull request?
Disable using radix sort in ShuffleExchangeExec when we do repartition.
In #20393, we fixed the indeterministic result in the shuffle repartition case by performing a local sort before repartitioning.
But for the newly added sort operation, we use radix sort which is wrong because binary data can't be compared by only the prefix. This makes the sort unstable and fails to solve the indeterminate shuffle output problem.
### Why are the changes needed?
Fix the correctness bug caused by repartition after a shuffle.
### Does this PR introduce any user-facing change?
Yes, user will get the right result in the case of repartition stage rerun.
## How was this patch tested?
Test with `local-cluster[5, 2, 5120]`, use the integrated test below, it can return a right answer 100000000.
```
import scala.sys.process._
import org.apache.spark.TaskContext
val res = spark.range(0, 10000 * 10000, 1).map{ x => (x % 1000, x)}
// kill an executor in the stage that performs repartition(239)
val df = res.repartition(113).map{ x => (x._1 + 1, x._2)}.repartition(239).map { x =>
if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && TaskContext.get.stageAttemptNumber == 0) {
throw new Exception("pkill -f -n java".!!)
}
x
}
val r2 = df.distinct.count()
```
Closes#25491 from xuanyuanking/SPARK-28699-fix.
Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
Introduces the collectInPlanAndSubqueries and subqueriesAll methods in QueryPlan that consider all the plans in the query plan, including the ones in nested subqueries.
## How was this patch tested?
Unit test added
Closes#25433 from dbaliafroozeh/IntroduceCollectInPlanAndSubqueries.
Authored-by: Ali Afroozeh <ali.afroozeh@databricks.com>
Signed-off-by: herman <herman@databricks.com>
### What changes were proposed in this pull request?
Delete the incorrect method `def setWeightCol(value: Double): this.type = set(threshold, value)` in `LinearSVCModel`
### Why are the changes needed?
`LinearSVCModel` should not provide this setter, moreover, this method is wrongly defined.
### Does this PR introduce any user-facing change?
yes, a public method is removed
### How was this patch tested?
existing suites
Closes#25510 from zhengruifeng/linearsvc_model_set_weightcol.
Authored-by: zhengruifeng <ruifengz@foxmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
<!--
Thanks for sending a pull request! Here are some tips for you:
1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
4. Be sure to keep the PR description updated to reflect all changes.
5. Please write your PR title to summarize what this PR proposes.
6. If possible, provide a concise example to reproduce the issue for a faster review.
-->
### What changes were proposed in this pull request?
<!--
Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue.
If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
2. If you fix some SQL features, you can provide some references of other DBMSes.
3. If there is design documentation, please add the link.
4. If there is a discussion in the mailing list, please add the link.
-->
Dealing with interrupted exception in BarrierTaskContext.barrier()
### Why are the changes needed?
<!--
Please clarify why the changes are needed. For instance,
1. If you propose a new API, clarify the use case for a new API.
2. If you fix a bug, you can clarify why it is a bug.
-->
Interrupted exception will happen in the case sparkContext local property "spark.job.interruptOnCancel" set true.
### Does this PR introduce any user-facing change?
<!--
If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
If no, write 'No'.
-->
No.
### How was this patch tested?
<!--
If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
If tests were not added, please describe why they were not added and/or why it was difficult to add.
-->
UT.
Closes#25519 from WeichenXu123/barrier_fl.
Authored-by: WeichenXu <weichen.xu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
The rule ReuseExchange optimization rule will look for instances of Exchange that have the same plan and convert dedupe them to them to a ReuseExchangeExec instance. In the current Spark codebase all Exchange instances are row based, but if we use the spark.sql.extensions config to put in our own columnar based exchange implementation reuse will throw an exception saying that there was a columnar mismatch.
### Why are the changes needed?
Without it Reused Columnar Exchanges throw an exception
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
I tested this patch by running it against a query that was showing this exact issue and it fixed it.
I also added a very simple unit test that shows the issue.
Closes#25499 from revans2/reused-columnar-exchange.
Authored-by: Robert (Bobby) Evans <bobby@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
This PR adds a V1 fallback interface for writing to V2 Tables using V1 Writer interfaces. The only supported SaveMode that will be called on the target table will be an Append. The target table must use V2 interfaces such as `SupportsOverwrite` or `SupportsTruncate` to support Overwrite operations. It is up to the target DataSource implementation if this operation can be atomic or not.
We do not support dynamicPartitionOverwrite, as we cannot call a `commit` method that actually cleans up the data in the partitions that were touched through this fallback.
## How was this patch tested?
Will add tests and example implementation after comments + feedback. This is a proposal at this point.
Closes#25348 from brkyvz/v1WriteFallback.
Lead-authored-by: Burak Yavuz <brkyvz@gmail.com>
Co-authored-by: Burak Yavuz <burak@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
add an example for storage tab
## How was this patch tested?
locally building
Closes#25445 from zhengruifeng/doc_ui_storage.
Authored-by: zhengruifeng <ruifengz@foxmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
The expression `IntegralDivide`, which corresponds to the `div` operator, support only integral type. Postgres, though, allows it to work also with decimals.
The PR adds the support to decimal operands for this operation in order to have feature parity with postgres.
## How was this patch tested?
added UTs
Closes#25136 from mgaido91/SPARK-28322.
Authored-by: Marco Gaido <marcogaido91@gmail.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
## What changes were proposed in this pull request?
This change reverts the logic which was introduced as a part of SPARK-24149 and a subsequent followup PR.
With existing logic:
- Spark fails to launch with HDFS federation enabled while trying to get a path to a logical nameservice.
- It gets tokens for unrelated namespaces if they are used in HDFS Federation
- Automatic namespace discovery is supported only if these are on the same cluster.
Rationale for change:
- For accessing data from related namespaces, viewfs should handle getting tokens for spark
- For accessing data from unrelated namespaces(user explicitly specifies them using existing configs) as these could be on the same or different cluster.
(Please fill in changes proposed in this fix)
Revert the changes.
## How was this patch tested?
Ran few manual tests and unit test.
Closes#24785 from dhruve/bug/SPARK-27937.
Authored-by: Dhruve Ashar <dhruveashar@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
### What changes were proposed in this pull request?
This PR changes subquery reuse in Adaptive Query Execution from compile-time static reuse to execution-time dynamic reuse. This PR adds a `ReuseAdaptiveSubquery` rule that applies to a query stage after it is created and before it is executed. The new dynamic reuse enables subqueries to be reused across all different subquery levels.
### Why are the changes needed?
This is an improvement to the current subquery reuse in Adaptive Query Execution, which allows subquery reuse to happen in a lazy fashion as well as at different subquery levels.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Passed existing tests.
Closes#25471 from maryannxue/aqe-dynamic-sub-reuse.
Authored-by: maryannxue <maryannxue@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
This is a pure refactor PR, which creates a new class `CatalogManager` to track the registered v2 catalogs, and provide the catalog up functionality.
`CatalogManager` also tracks the current catalog/namespace. We will implement corresponding commands in other PRs, like `USE CATALOG my_catalog`
## How was this patch tested?
existing tests
Closes#25368 from cloud-fan/refactor.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
# What changes were proposed in this pull request?
This patch modifies the explanation of guarantee for ForeachWriter as it doesn't guarantee same output for `(partitionId, epochId)`. Refer the description of [SPARK-28650](https://issues.apache.org/jira/browse/SPARK-28650) for more details.
Spark itself still guarantees same output for same epochId (batch) if the preconditions are met, 1) source is always providing the same input records for same offset request. 2) the query is idempotent in overall (indeterministic calculation like now(), random() can break this).
Assuming breaking preconditions as an exceptional case (the preconditions are implicitly required even before), we still can describe the guarantee with `epochId`, though it will be harder to leverage the guarantee: 1) ForeachWriter should implement a feature to track whether all the partitions are written successfully for given `epochId` 2) There's pretty less chance to leverage the fact, as the chance for Spark to successfully write all partitions and fail to checkpoint the batch is small.
Credit to zsxwing on discovering the broken guarantee.
## How was this patch tested?
This is just a documentation change, both on javadoc and guide doc.
Closes#25407 from HeartSaVioR/SPARK-28650.
Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>