### What changes were proposed in this pull request?
Re-enable some pandas-on-Spark test cases.
### Why are the changes needed?
pandas version in GitHub Actions is upgraded now so we can re-enable some pandas-on-Spark test cases.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit tests.
Closes#32682 from xinrong-databricks/enable_tests.
Authored-by: Xinrong Meng <xinrong.meng@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Introduce a util function `spark_column_equals` to check the underlying expressions of columns are the same or not.
### Why are the changes needed?
In pandas on Spark, there are some places checking the underlying expressions of columns are the same or not, but it's done one-by-one.
We should introduce a util function for it.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
The existing tests.
Closes#32680 from ueshin/issues/SPARK-35537/spark_column_equals.
Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
BinaryType, which represents byte sequence values in Spark, doesn't support data-type-based operations yet. We are going to introduce BinaryOps for it.
### Why are the changes needed?
The data-type-based-operations class should be set for each individual data type, including BinaryType.
In addition, BinaryType has its special way of addition, which means concatenation.
### Does this PR introduce _any_ user-facing change?
Yes.
Before the change:
```py
>>> import pyspark.pandas as ps
>>> psser = ps.Series([b'1', b'2', b'3'])
>>> psser + psser
Traceback (most recent call last):
...
TypeError: Type object was not understood.
>>> psser + b'1'
Traceback (most recent call last):
...
TypeError: Type object was not understood.
```
After the change:
```py
>>> import pyspark.pandas as ps
>>> psser = ps.Series([b'1', b'2', b'3'])
>>> psser + psser
0 [49, 49]
1 [50, 50]
2 [51, 51]
dtype: object
>>> psser + b'1'
0 [49, 49]
1 [50, 49]
2 [51, 49]
dtype: object
```
### How was this patch tested?
Unit tests.
Closes#32665 from xinrong-databricks/datatypeops_binary.
Lead-authored-by: Xinrong Meng <xinrong.meng@databricks.com>
Co-authored-by: xinrong-databricks <47337188+xinrong-databricks@users.noreply.github.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
### What changes were proposed in this pull request?
The PR is proposed to introduce ArrayOps, MapOps and StructOps to handle data-type-based operations for StructType, ArrayType, and MapType separately.
### Why are the changes needed?
StructType, ArrayType, and MapType are not accepted by DataTypeOps now.
We should handle these complex types. Among them:
- ArrayType supports concatenation: for example, ps.Series([[1,2,3]]) + ps.Series([[4,5,6]]) should work the same as pd.Series([[1,2,3]]) + pd.Series([[4,5,6]]), as concatenation.
- StructOps will be helpful to make to/from pandas conversion data-type-based.
### Does this PR introduce _any_ user-facing change?
Yes.
Before the change:
```py
>>> import pyspark.pandas as ps
>>> from pyspark.pandas.config import set_option
>>> set_option("compute.ops_on_diff_frames", True)
>>> ps.Series([[1, 2, 3]]) + ps.Series([[0.4, 0.5]])
Traceback (most recent call last):
...
TypeError: Type object was not understood.
>>> ps.Series([[1, 2, 3]]) + ps.Series([[4, 5]])
Traceback (most recent call last):
...
TypeError: Type object was not understood.
>>> ps.Series([[1, 2, 3]]) + ps.Series([['x']])
Traceback (most recent call last):
...
TypeError: Type object was not understood.
```
After the change:
```py
>>> import pyspark.pandas as ps
>>> from pyspark.pandas.config import set_option
>>> set_option("compute.ops_on_diff_frames", True)
>>> ps.Series([[1, 2, 3]]) + ps.Series([[0.4, 0.5]])
0 [1.0, 2.0, 3.0, 0.4, 0.5]
dtype: object
>>> ps.Series([[1, 2, 3]]) + ps.Series([[4, 5]])
0 [1, 2, 3, 4, 5]
dtype: object
>>> ps.Series([[1, 2, 3]]) + ps.Series([['x']])
Traceback (most recent call last):
...
TypeError: Concatenation can only be applied to arrays of the same type
```
### How was this patch tested?
Unit tests.
Closes#32626 from xinrong-databricks/datatypeop_complex.
Authored-by: Xinrong Meng <xinrong.meng@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
### What changes were proposed in this pull request?
The main code change is:
* Change rule `DemoteBroadcastHashJoin` to `DynamicJoinSelection` and add shuffle hash join selection code.
* Specify a join strategy hint `SHUFFLE_HASH` if AQE think a join can be converted to SHJ.
* Skip `preferSortMerge` config check in AQE side if a join can be converted to SHJ.
### Why are the changes needed?
Use AQE runtime statistics to decide if we can use shuffled hash join instead of sort merge join. Currently, the formula of shuffled hash join selection dose not work due to the dymanic shuffle partition number.
Add a new config spark.sql.adaptive.shuffledHashJoinLocalMapThreshold to decide if join can be converted to shuffled hash join safely.
### Does this PR introduce _any_ user-facing change?
Yes, add a new config.
### How was this patch tested?
Add test.
Closes#32550 from ulysses-you/SPARK-35282-2.
Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Add the metrics to record how many tasks fallback to sort-based aggregation for hash aggregation. This will help developers and users to debug and optimize query. Object hash aggregation has similar metrics already.
### Why are the changes needed?
Help developers and users to debug and optimize query with hash aggregation.
### Does this PR introduce _any_ user-facing change?
Yes, the added metrics will show up in Spark web UI.
Example:
<img width="604" alt="Screen Shot 2021-05-26 at 12 17 08 AM" src="https://user-images.githubusercontent.com/4629931/119618437-bf3c5880-bdb7-11eb-89bb-5b88db78639f.png">
### How was this patch tested?
Changed unit test in `SQLMetricsSuite.scala`.
Closes#32671 from c21/agg-metrics.
Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This PR adds a feature for removing pulled container image after every docker integration test finish.
This feature is enabled by the new propoerty `spark.tes.docker.removePulledImage`.
### Why are the changes needed?
For idempotent.
I'm trying to add docker integration tests to GA in SPARK-35483 (#32631) but I noticed that `jdbc.OracleIntegrationSuite` consistently fails(https://github.com/sarutak/spark/runs/2646707235?check_suite_focus=true).
I investigated the reason and I found it's short of the storage capacity of the host on GA.
```
ORACLE PASSWORD FOR SYS AND SYSTEM: oracle
The location '/opt/oracle' specified for database files has insufficient space.
Database creation needs at least '4.5GB' disk space.
Specify a different database file destination that has enough space in the configuration file '/etc/sysconfig/oracle-xe-18c.conf'.
mv: cannot stat '/opt/oracle/product/18c/dbhomeXE/dbs/spfileXE.ora': No such file or directory
mv: cannot stat '/opt/oracle/product/18c/dbhomeXE/dbs/orapwXE': No such file or directory
ORACLE_HOME = [/home/oracle] ? ORACLE_BASE environment variable is not being set since this
information is not available for the current user ID .
You can set ORACLE_BASE manually if it is required.
Resetting ORACLE_BASE to its previous value or ORACLE_HOME
The Oracle base remains unchanged with value /opt/oracle
#####################################
########### E R R O R ###############
DATABASE SETUP WAS NOT SUCCESSFUL!
Please check output for further info!
########### E R R O R ###############
#####################################
The following output is now a tail of the alert.log:
tail: cannot open '/opt/oracle/diag/rdbms/*/*/trace/alert*.log' for reading: No such file or directory
tail: no files remaining
```
With this feature, pulled container image is removed and keep the capacity for `jdbc.OracleIntegrationSuite` in GA.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
I confirmed the following things.
* A container image which is absent in the local repository is removed after test finished if `spark.test.container.removePulledImage` is `true`.
* A container image which is present in the local repository is not removed after the finished even if `spark.test.container.removePulledImage` is `true`.
* A container image is not removed regardless of presence of the container image in the local repository even if `spark.test.container.removePulledImage` is `true`.
Closes#32652 from sarutak/docker-image-rm.
Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This PR fixes `HiveExternalCatalogVersionsSuite`.
With this change, only <major>.<minor> version is set to `spark.sql.hive.metastore.version`.
### Why are the changes needed?
I'm personally checking whether all the tests pass with Java 11 for the current `master` and I found `HiveExternalCatalogVersionsSuite` fails.
The reason is that Spark 3.0.2 and 3.1.1 doesn't accept `2.3.8` as a hive metastore version.
`HiveExternalCatalogVersionsSuite` downloads Spark releases from https://dist.apache.org/repos/dist/release/spark/ and run test for each release. The Spark releases are `3.0.2` and `3.1.1` for the current `master` for now.
e47e615c0e/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala (L239-L259)
With Java 11, the suite run with a hive metastore version which corresponds to the builtin Hive version and it's `2.3.8` for the current `master`.
20750a3f9e/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala (L62-L66)
But `branch-3.0` and `branch-3.1` doesn't accept `2.3.8`, the suite with Java 11 fails.
Another solution would be backporting SPARK-34271 (#31371) but after [a discussion](https://github.com/apache/spark/pull/32668#issuecomment-848435170), we prefer to fix the test,
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests with CI.
Closes#32670 from sarutak/fix-version-suite-for-java11.
Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This PR aims to upgrade json4s from 3.7.0-M5 to 3.7.0-M11
Note: json4s version greater than 3.7.0-M11 is not binary compatible with Spark third party jars
### Why are the changes needed?
Multiple defect fixes and improvements like
https://github.com/json4s/json4s/issues/750https://github.com/json4s/json4s/issues/554https://github.com/json4s/json4s/issues/715
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Ran with the existing UTs
Closes#32636 from vinodkc/br_build_upgrade_json4s.
Authored-by: Vinod KC <vinod.kc.in@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
### What changes were proposed in this pull request?
Add the function type, such as "scala_udf", "python_udf", "java_udf", "hive", "built-in" to the `ExpressionInfo` for UDF.
### Why are the changes needed?
Make the `ExpressionInfo` of UDF more meaningful
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
existing and newly added UT
Closes#32587 from linhongliu-db/udf-language.
Authored-by: Linhong Liu <linhong.liu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This PR proposes to use a proper built-in exceptions instead of the plain `Exception` in Python.
While I am here, I fixed another minor issue at `DataFrams.schema` together:
```diff
- except AttributeError as e:
- raise Exception(
- "Unable to parse datatype from schema. %s" % e)
+ except Exception as e:
+ raise ValueError(
+ "Unable to parse datatype from schema. %s" % e) from e
```
Now it catches all exceptions during schema parsing, chains the exception with `ValueError`. Previously it only caught `AttributeError` that does not catch all cases.
### Why are the changes needed?
For users to expect the proper exceptions.
### Does this PR introduce _any_ user-facing change?
Yeah, the exception classes became different but should be compatible because previous exception was plain `Exception` which other exceptions inherit.
### How was this patch tested?
Existing unittests should cover,
Closes#31238Closes#32650 from HyukjinKwon/SPARK-32194.
Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This PR enables GitHub Actions to test PySpark with Python 3.9.
### Why are the changes needed?
To verify the support of Python 3.9.
### Does this PR introduce _any_ user-facing change?
No, test-only.
### How was this patch tested?
Existing tests should cover.
Closes#32657 from HyukjinKwon/SPARK-35506.
Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This PR aims to upgrade joda-time from 2.10.5 to 2.10.10
### Why are the changes needed?
Improvement and bug fixes in joda-time
https://www.joda.org/joda-time/changes-report.html#a2.10.10
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Ran with the existing UTs
Closes#32661 from vinodkc/br_build_upgrade_joda_time.
Authored-by: Vinod KC <vinod.kc.in@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
Removes APIs which have been deprecated in Koalas.
### Why are the changes needed?
There are some APIs that have been deprecated in Koalas. We shouldn't have those in pandas APIs on Spark.
### Does this PR introduce _any_ user-facing change?
Yes, the APIs deprecated in Koalas will be no longer available.
### How was this patch tested?
Modified some tests which use the deprecated APIs, and the other existing tests should pass.
Closes#32656 from ueshin/issues/SPARK-35505/remove_deprecated.
Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
### What changes were proposed in this pull request?
Automatically update version index of DocSearch via release-tag.sh for releasing new documentation site, instead of the current manual update.
### Why are the changes needed?
Simplify the release process.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Manually run the following command and check the diff
```
R_NEXT_VERSION=3.2.0
sed -i".tmp8" "s/'facetFilters':.*$/'facetFilters': [\"version:$R_NEXT_VERSION\"]/g" docs/_config.yml
```
Closes#32662 from gengliangwang/updateDocsearchInRelease.
Authored-by: Gengliang Wang <ltnwgl@gmail.com>
Signed-off-by: Gengliang Wang <ltnwgl@gmail.com>
This PR is proposing a add-on to support to manual close entries in MemoryStore and InMemoryRelation
### What changes were proposed in this pull request?
Currently:
MemoryStore uses a LinkedHashMap[BlockId, MemoryEntry[_]] to store all OnHeap or OffHeap entries.
And when memoryStore.remove(blockId) is called, codes will simply remove one entry from LinkedHashMap and leverage Java GC to do release work.
This PR:
We are proposing a add-on to manually close any object stored in MemoryStore and InMemoryRelation if this object is extended from AutoCloseable.
Veifiication:
In our own use case, we implemented a user-defined off-heap-hashRelation for BHJ, and we verified that by adding this manual close, we can make sure our defined off-heap-hashRelation can be released when evict is called.
Also, we implemented user-defined cachedBatch and will be release when InMemoryRelation.clearCache() is called by this PR
### Why are the changes needed?
This changes can help to clean some off-heap user-defined object may be cached in InMemoryRelation or MemoryStore
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
WIP
Signed-off-by: Chendi Xue <chendi.xueintel.com>
Closes#32534 from xuechendi/support_manual_close_in_memorystore.
Authored-by: Chendi Xue <chendi.xue@intel.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
### What changes were proposed in this pull request?
This PR improves the interaction between partition coalescing and skew handling by moving the skew join rule ahead of the partition coalescing rule and making corresponding changes to the two rules:
1. Simplify `OptimizeSkewedJoin` as it doesn't need to handle `CustomShuffleReaderExec` anymore.
2. Update `CoalesceShufflePartitions` to support coalescing non-skewed partitions.
### Why are the changes needed?
It's a bit hard to reason about skew join if partitions have been coalesced. A skewed partition needs to be much larger than other partitions and we need to look at the raw sizes before coalescing.
It also makes `OptimizeSkewedJoin` more robust, as we don't need to worry about a skewed partition being coalesced with a small partition and breaks skew join handling.
It also helps with https://github.com/apache/spark/pull/31653 , which needs to move `OptimizeSkewedJoin` to an earlier phase and run before `CoalesceShufflePartitions`.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
new UT and existing tests
Closes#32594 from cloud-fan/shuffle.
Lead-authored-by: Wenchen Fan <wenchen@databricks.com>
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This patch is a follow-up of SPARK-26848 (#23747). In SPARK-26848, we decided to open possibility to let end users set individual timestamp per partition. But in many cases, specifying timestamp represents the intention that we would want to go back to specific timestamp and reprocess records, which should be applied to all topics and partitions.
This patch proposes to provide a way to set a global timestamp across topic-partitions which the source is subscribing to, so that end users can set all offsets by specific timestamp easily. To provide the way to config the timestamp easier, the new options only receive "a" timestamp for start/end timestamp.
New options introduced in this PR:
* startingTimestamp
* endingTimestamp
All two options receive timestamp as string.
There're priorities for options regarding starting/ending offset as we will have three options for start offsets and another three options for end offsets. Priorities are following:
* starting offsets: startingTimestamp -> startingOffsetsByTimestamp -> startingOffsets
* ending offsets: startingTimestamp -> startingOffsetsByTimestamp -> startingOffsets
### Why are the changes needed?
Existing option to specify timestamp as offset is quite verbose if there're a lot of partitions across topics. Suppose there're 100s of partitions in a topic, the json should contain 100s of times of the same timestamp.
Also, the number of partitions can also change, which requires either:
* fixing the code if the json is statically created
* introducing the dependencies on Kafka client and deal with Kafka API on crafting json programmatically
Both approaches are even not "acceptable" if we're dealing with ad-hoc query; anyone doesn't want to write the code more complicated than the query itself. Flink [provides the option](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-consumers-start-position-configuration) to specify a timestamp for all topic-partitions like this PR, and even doesn't provide the option to specify the timestamp per topic-partition.
With this PR, end users are only required to provide a single timestamp value. No more complicated JSON format end users need to know about the structure.
### Does this PR introduce _any_ user-facing change?
Yes, this PR introduces two new options, described in above section.
Doc changes are following:
![스크린샷 2021-05-21 오후 12 01 02](https://user-images.githubusercontent.com/1317309/119076244-3034e680-ba2d-11eb-8323-0e227932d2e5.png)
![스크린샷 2021-05-21 오후 12 01 12](https://user-images.githubusercontent.com/1317309/119076255-35923100-ba2d-11eb-9d79-538a7f9ee738.png)
![스크린샷 2021-05-21 오후 12 01 24](https://user-images.githubusercontent.com/1317309/119076264-39be4e80-ba2d-11eb-8265-ac158f55c360.png)
![스크린샷 2021-05-21 오후 12 06 01](https://user-images.githubusercontent.com/1317309/119076271-3d51d580-ba2d-11eb-98ea-35fd72b1bbfc.png)
### How was this patch tested?
New UTs covering new functionalities. Also manually tested via simple batch & streaming queries.
Closes#32609 from HeartSaVioR/SPARK-29223-v2.
Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
### What changes were proposed in this pull request?
When a memory reservation triggers a self-spill, `ExecutionMemoryPool#releaseMemory()` will immediately notify waiting tasks that memory has been freed. If there are any waiting tasks with less than 1/2N of the memory pool, they may acquire the newly-freed memory before the current task has a chance to do so. This will cause the original memory reservation to fail. If the initial spill did not release all available memory, the reservation could have been satisfied by asking it to spill again.
This PR adds logic to TaskMemoryManager to detect this case and retry.
### Why are the changes needed?
This bug affects queries with a MemoryConsumer that can spill part of its memory, such as BytesToBytesMap. If the MemoryConsumer is using all available memory and there is a waiting task, then attempting to acquire more memory on the MemoryConsumer will trigger a partial self-spill. However, because the waiting task gets priority, the attempt to acquire memory will fail even if it could have been satisfied by another spill.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added a test to MemoryManagerSuite that previously failed and now passes.
Closes#32625 from ankurdave/SPARK-35486.
Authored-by: Ankur Dave <ankurdave@gmail.com>
Signed-off-by: yi.wu <yi.wu@databricks.com>
### What changes were proposed in this pull request?
* remove `EliminateUnnecessaryJoin`, using `AQEPropagateEmptyRelation` instead.
* eliminate join, aggregate, limit, repartition, sort, generate which is beneficial.
### Why are the changes needed?
Make `EliminateUnnecessaryJoin` available with more case.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Add test.
Closes#32602 from ulysses-you/SPARK-35455.
Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This PR enables plot tests with plotly
```bash
./python/run-tests --python-executables=python3 --modules=pyspark-pandas
```
**Before**:
```
Traceback (most recent call last):
File "/.../miniconda3/envs/python3.8/lib/python3.8/runpy.py", line 194, in _run_module_as_main
return _run_code(code, main_globals, None,
File "/.../miniconda3/envs/python3.8/lib/python3.8/runpy.py", line 87, in _run_code
exec(code, run_globals)
File "/.../pyspark/pandas/tests/plot/test_frame_plot_plotly.py", line 42, in <module>
plotly_requirement_message + " Or pandas<1.0; pandas<1.0 does not support latest plotly "
TypeError: unsupported operand type(s) for +: 'NoneType' and 'str'
```
**After**:
```
...
Starting test(python3): pyspark.pandas.tests.plot.test_series_plot_plotly
...
Finished test(python3): pyspark.pandas.tests.plot.test_series_plot_plotly (23s)
...
Tests passed in 1296 seconds
```
### Why are the changes needed?
For test coverage.
### Does this PR introduce _any_ user-facing change?
No, test-only.
### How was this patch tested?
By running the tests.
Closes#32649 from HyukjinKwon/SPARK-35497.
Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Addressed the dongjoon-hyun comments on the previous PR #30018.
Extended the `RemoveRedundantAggregates` rule to remove redundant aggregations in even more queries. For example in
```
dataset
.dropDuplicates()
.groupBy('a)
.agg(max('b))
```
the `dropDuplicates` is not needed, because the result on `max` does not depend on duplicate values.
### Why are the changes needed?
Improve performance.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT
Closes#31914 from tanelk/SPARK-33122_redundant_aggs_followup.
Lead-authored-by: tanel.kiis@gmail.com <tanel.kiis@gmail.com>
Co-authored-by: Tanel Kiis <tanel.kiis@gmail.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
### What changes were proposed in this pull request?
Add thread target wrapper API for pyspark pin thread mode.
### Why are the changes needed?
A helper method which make user easier to write threading code under pin thread mode.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Manual.
Closes#32644 from WeichenXu123/add_thread_target_wrapper_api.
Authored-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This PR fixes an issue that `RemoveRedundantProjects` removes `ProjectExec` which is for generating `UnsafeRow`.
In `DataSourceV2Strategy`, `ProjectExec` will be inserted to ensure internal rows are `UnsafeRow`.
```
private def withProjectAndFilter(
project: Seq[NamedExpression],
filters: Seq[Expression],
scan: LeafExecNode,
needsUnsafeConversion: Boolean): SparkPlan = {
val filterCondition = filters.reduceLeftOption(And)
val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan)
if (withFilter.output != project || needsUnsafeConversion) {
ProjectExec(project, withFilter)
} else {
withFilter
}
}
...
case PhysicalOperation(project, filters, relation: DataSourceV2ScanRelation) =>
// projection and filters were already pushed down in the optimizer.
// this uses PhysicalOperation to get the projection and ensure that if the batch scan does
// not support columnar, a projection is added to convert the rows to UnsafeRow.
val batchExec = BatchScanExec(relation.output, relation.scan)
withProjectAndFilter(project, filters, batchExec, !batchExec.supportsColumnar) :: Nil
```
So, the hierarchy of the partial tree should be like `ProjectExec(FilterExec(BatchScan))`.
But `RemoveRedundantProjects` doesn't consider this type of hierarchy, leading `ClassCastException`.
A concreate example to reproduce this issue is reported:
```
import scala.collection.JavaConverters._
import org.apache.iceberg.{PartitionSpec, TableProperties}
import org.apache.iceberg.hadoop.HadoopTables
import org.apache.iceberg.spark.SparkSchemaUtil
import org.apache.spark.sql.{DataFrame, QueryTest, SparkSession}
import org.apache.spark.sql.internal.SQLConf
class RemoveRedundantProjectsTest extends QueryTest {
override val spark: SparkSession = SparkSession
.builder()
.master("local[4]")
.config("spark.driver.bindAddress", "127.0.0.1")
.appName(suiteName)
.getOrCreate()
test("RemoveRedundantProjects removes non-redundant projects") {
withSQLConf(
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
SQLConf.REMOVE_REDUNDANT_PROJECTS_ENABLED.key -> "true") {
withTempDir { dir =>
val path = dir.getCanonicalPath
val data = spark.range(3).toDF
val table = new HadoopTables().create(
SparkSchemaUtil.convert(data.schema),
PartitionSpec.unpartitioned(),
Map(TableProperties.WRITE_NEW_DATA_LOCATION -> path).asJava,
path)
data.write.format("iceberg").mode("overwrite").save(path)
table.refresh()
val df = spark.read.format("iceberg").load(path)
val dfX = df.as("x")
val dfY = df.as("y")
val join = dfX.filter(dfX("id") > 0).join(dfY, "id")
join.explain("extended")
assert(join.count() == 2)
}
}
}
}
```
```
[info] - RemoveRedundantProjects removes non-redundant projects *** FAILED ***
[info] org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 4) (xeroxms100.northamerica.corp.microsoft.com executor driver): java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast to org.apache.spark.sql.catalyst.expressions.UnsafeRow
[info] at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:226)
[info] at org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
```
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New test.
Closes#32606 from sarutak/fix-project-removal-issue.
Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Use `SpecificInternalRow` instead of `GenericInternalRow` to avoid boxing / unboxing cost.
### Why are the changes needed?
Since it doesn't know the input row schema, `GenericInternalRow` potentially need to apply boxing for input arguments. It's better to use `SpecificInternalRow` instead since we know input data types.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
Closes#32647 from sunchao/specific-input-row.
Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This PR fixes a bug with subexpression elimination for CaseWhen statements. https://github.com/apache/spark/pull/30245 added support for creating subexpressions that are present in all branches of conditional statements. However, for a statement to be in "all branches" of a CaseWhen statement, it must also be in the elseValue.
### Why are the changes needed?
Fix a bug where a subexpression can be created and run for branches of a conditional that don't pass. This can cause issues especially with a UDF in a branch that gets executed assuming the condition is true.
### Does this PR introduce _any_ user-facing change?
Yes, fixes a potential bug where a UDF could be eagerly executed even though it might expect to have already passed some form of validation. For example:
```
val col = when($"id" < 0, myUdf($"id"))
spark.range(1).select(when(col > 0, col)).show()
```
`myUdf($"id")` is considered a subexpression and eagerly evaluated, because it is pulled out as a common expression from both executions of the when clause, but if `id >= 0` it should never actually be run.
### How was this patch tested?
Updated existing test with new case.
Closes#32595 from Kimahriman/bug-case-subexpr-elimination.
Authored-by: Adam Binford <adamq43@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
### What changes were proposed in this pull request?
As discussed, update SparkR maintainer for future release.
### Why are the changes needed?
Shivaram will not be able to work with this in the future, so we would like to migrate off the maintainer contact email.
shivaram
Closes#32642 from felixcheung/sparkr-maintainer.
Authored-by: Felix Cheung <felixcheung@users.noreply.github.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
Adds more type annotations in the files:
- `python/pyspark/pandas/spark/accessors.py`
- `python/pyspark/pandas/typedef/typehints.py`
- `python/pyspark/pandas/utils.py`
and fixes the mypy check failures.
### Why are the changes needed?
We should enable more `disallow_untyped_defs` mypy checks.
### Does this PR introduce _any_ user-facing change?
Yes.
This PR adds more type annotations in pandas APIs on Spark module, which can impact interaction with development tools for users.
### How was this patch tested?
The mypy check with a new configuration and existing tests should pass.
Closes#32627 from ueshin/issues/SPARK-35467_35468_35477/disallow_untyped_defs.
Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This patch fixes a bug when dealing with common expressions in conditional expressions such as `CaseWhen` during subexpression elimination.
For example, previously we find common expressions among conditions of `CaseWhen`, but children expressions are also counted into. We should not count these children expressions as common expressions.
### Why are the changes needed?
If the redundant children expressions are counted as common expressions too, they will be redundantly evaluated and miss the subexpression elimination opportunity.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added tests.
Closes#32559 from viirya/SPARK-35410.
Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
This PR aims to upgrade Apache HttpCore from 4.4.12 to 4.4.14.
### Why are the changes needed?
Stability improvements in httpcore 4.4.14
- Bug fix: Non-blocking TLSv1.3 connections can end up in an infinite event spin when closed concurrently by the local and the remote endpoints.
- HTTPCORE-647: Non-blocking connection terminated due to 'java.io.IOException: Broken pipe' can enter an infinite loop flushing buffered output data.
- PR #201, HTTPCORE-634: Fix race condition in AbstractConnPool that can cause internal state
- corruption
- HTTPCORE-612: DefaultConnectionReuseStrategy incorrectly used int to represent Content-Length value
- instead of long
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
With Jenkins Tests
Closes#32638 from vinodkc/br_build_upgrade_httpcore.
Authored-by: Vinod KC <vinod.kc.in@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
`spark.blockManager.port` does not work for k8s driver pods now, we should make it work as other cluster managers.
### Why are the changes needed?
`spark.blockManager.port` should be able to work for spark driver pod
### Does this PR introduce _any_ user-facing change?
yes, `spark.blockManager.port` will be respect iff it is present && `spark.driver.blockManager.port` is absent
### How was this patch tested?
new tests
Closes#32639 from yaooqinn/SPARK-35493.
Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
This PR fixes an test added in SPARK-35226 (#32344).
### Why are the changes needed?
`SELECT 1` seems non-valid query for DB2.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
DB2KrbIntegrationSuite passes on my laptop.
I also confirmed all the KrbIntegrationSuites pass with the following command.
```
build/sbt -Phive -Phive-thriftserver -Pdocker-integration-tests "testOnly org.apache.spark.sql.jdbc.*KrbIntegrationSuite"
```
Closes#32632 from sarutak/followup-SPARK-35226.
Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
This PR updates SHA for the oracle docker image in the comment in `OracleIntegrationSuite` and `v2.OracleIntegrationSuite`.
The SHA for the latest image is `3f422c4a35b423dfcdbcc57a84f01db6c82eb6c1`
### Why are the changes needed?
The script name for creating the oracle docker image is changed in #32629, following the latest image so we also need to update the corresponding SHA.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
The value is from `git log`.
Closes#32630 from sarutak/followup-oracle-script-name.
Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
This patch is a followup of SPARK-35463. In SPARK-35463, we output a message to stdout and now we redirect it to stderr.
### Why are the changes needed?
All `echo` statements in `build/mvn` should redirect to stderr if it is not followed by `exit`. It is because we use `build/mvn` to get stdout output by other scripts. If we don't redirect it, we can get invalid output, e.g. got "Skipping checksum because shasum is not installed." as `commons-cli` version.
### Does this PR introduce _any_ user-facing change?
No. Dev only.
### How was this patch tested?
Manually test on internal system.
Closes#32637 from viirya/fix-build.
Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
This PR proposes to avoid wrapping if-else to the constant literals for `percentage` and `accuracy` in `percentile_approx`. They are expected to be literals (or foldable expressions).
Pivot works by two phrase aggregations, and it works with manipulating the input to `null` for non-matched values (pivot column and value).
Note that pivot supports an optimized version without such logic with changing input to `null` for some types (non-nested types basically). So the issue fixed by this PR is only for complex types.
```scala
val df = Seq(
("a", -1.0), ("a", 5.5), ("a", 2.5), ("b", 3.0), ("b", 5.2)).toDF("type", "value")
.groupBy().pivot("type", Seq("a", "b")).agg(
percentile_approx(col("value"), array(lit(0.5)), lit(10000)))
df.show()
```
**Before:**
```
org.apache.spark.sql.AnalysisException: cannot resolve 'percentile_approx((IF((type <=> CAST('a' AS STRING)), value, CAST(NULL AS DOUBLE))), (IF((type <=> CAST('a' AS STRING)), array(0.5D), NULL)), (IF((type <=> CAST('a' AS STRING)), 10000, CAST(NULL AS INT))))' due to data type mismatch: The accuracy or percentage provided must be a constant literal;
'Aggregate [percentile_approx(if ((type#7 <=> cast(a as string))) value#8 else cast(null as double), if ((type#7 <=> cast(a as string))) array(0.5) else cast(null as array<double>), if ((type#7 <=> cast(a as string))) 10000 else cast(null as int), 0, 0) AS a#16, percentile_approx(if ((type#7 <=> cast(b as string))) value#8 else cast(null as double), if ((type#7 <=> cast(b as string))) array(0.5) else cast(null as array<double>), if ((type#7 <=> cast(b as string))) 10000 else cast(null as int), 0, 0) AS b#18]
+- Project [_1#2 AS type#7, _2#3 AS value#8]
+- LocalRelation [_1#2, _2#3]
```
**After:**
```
+-----+-----+
| a| b|
+-----+-----+
|[2.5]|[3.0]|
+-----+-----+
```
### Why are the changes needed?
To make percentile_approx work with pivot as expected
### Does this PR introduce _any_ user-facing change?
Yes. It threw an exception but now it returns a correct result as shown above.
### How was this patch tested?
Manually tested and unit test was added.
Closes#32619 from HyukjinKwon/SPARK-35480.
Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This PR aims to upgrade ORC to 1.6.8.
### Why are the changes needed?
This will bring the latest bug fixes.
- https://orc.apache.org/news/2021/05/21/ORC-1.6.8/
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass the existing CIs.
Closes#32635 from dongjoon-hyun/SPARK-35489.
Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
This PR aims to upgrade ASM to 7.3.1.
- https://issues.apache.org/jira/browse/XBEAN-323
- https://asm.ow2.io/versions.html
### Why are the changes needed?
ASM 7.3.1 bring following changes
- new V15 constant
- experimental support for PermittedSubtypes and RecordComponent
- bug fixes
- - 317885: SKIP_DEBUG now skips MethodParameters attributes
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Ran with the existing UTs
Closes#32634 from vinodkc/br_build_upgrade_asm.
Authored-by: Vinod KC <vinod.kc.in@gmail.com>
Signed-off-by: Kousuke Saruta <sarutak@oss.nttdata.com>
### What changes were proposed in this pull request?
This PR changes the name in the comment in `jdbc.OracleIntegrationSuite` and `v2.OracleIntegrationSuite`.
The script is for creating oracle docker image.
### Why are the changes needed?
The name of the script is `buildContainerImage`, not `buildDockerImage` now.
- d918f5a4c6 (diff-be303ab32e74192aca829e5ea259a0aec07aac23a6049120fb337ec4efa601b0)
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
I confirmed that I can build the image with `./buildContainerImage.sh -v 18.4.0 -x`.
Closes#32629 from sarutak/change-oracle-container-script-name.
Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
This PR upgrades Dropwizard metrics to 4.2.0.
I also modified the corresponding links in `docs/monitoring.md`.
### Why are the changes needed?
The latest version was released last week and it contains some improvements.
https://github.com/dropwizard/metrics/releases/tag/v4.2.0
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Build succeeds and all the modified links are reachable.
Closes#32628 from sarutak/upgrade-dropwizard-4.2.0.
Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
This is a followup of https://github.com/apache/spark/pull/32411, to fix a mistake and use `sparkSession.sessionState.newHadoopConf` which includes SQL configs instead of `sparkSession.sparkContext.hadoopConfiguration` .
### Why are the changes needed?
fix mistake
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
existing tests
Closes#32618 from cloud-fan/follow1.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Kent Yao <yao@apache.org>
### What changes were proposed in this pull request?
Sets up the `mypy` configuration to enable `disallow_untyped_defs` check for pandas APIs on Spark module.
### Why are the changes needed?
Currently many functions in the main codes in pandas APIs on Spark module are still missing type annotations and disabled `mypy` check `disallow_untyped_defs`.
We should add more type annotations and enable the mypy check.
### Does this PR introduce _any_ user-facing change?
Yes.
This PR adds more type annotations in pandas APIs on Spark module, which can impact interaction with development tools for users.
### How was this patch tested?
The mypy check with a new configuration and existing tests should pass.
Closes#32614 from ueshin/issues/SPARK-35465/disallow_untyped_defs.
Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
### What changes were proposed in this pull request?
This patch sorts equivalent expressions based on their child-parent relation.
### Why are the changes needed?
`EquivalentExpressions` maintains a map of equivalent expressions. It is `HashMap` now so the insertion order is not guaranteed to be preserved later. Subexpression elimination relies on retrieving subexpressions from the map. If there is child-parent relationships among the subexpressions, we want the child expressions come first than parent expressions, so we can replace child expressions in parent expressions with subexpression evaluation.
For example, we have two different expressions `Add(Literal(1), Literal(2))` and `Add(Literal(3), add)`.
Case 1: child subexpr comes first.
```scala
addExprTree(add)
addExprTree(Add(Literal(3), add))
addExprTree(Add(Literal(3), add))
```
Case 2: parent subexpr comes first. For this case, we need to sort equivalent expressions.
```
addExprTree(Add(Literal(3), add)) => We add `Add(Literal(3), add)` into the map first, then add `add` into the map
addExprTree(add)
addExprTree(Add(Literal(3), add))
```
As we are going to sort equivalent expressions at all, we don't need `LinkedHashMap` but just do sorting.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added tests.
Closes#32586 from viirya/use-listhashmap.
Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
### What changes were proposed in this pull request?
This is a followup of https://github.com/apache/spark/pull/32622 to fix a test case.
### Why are the changes needed?
Fix a wrong test case name and fix the test case to cause the expected error correctly.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass the CIs.
Closes#32623 from dongjoon-hyun/SPARK-34558.
Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
This is a followup of https://github.com/apache/spark/pull/31671https://github.com/apache/spark/pull/31671 qualifies the warehouse at the beginning, which may fail Spark startup if something goes wrong, like the underlying FileSystem can't be initialized.
This PR falls back to the old behavior and leave the warehouse path unqualified if qualifying fails.
### Why are the changes needed?
Fix a regression. It's important to be always able to start Spark app (e.g. spark-shell), so that we can debug.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
a new test case
Closes#32622 from cloud-fan/follow2.
Lead-authored-by: Wenchen Fan <wenchen@databricks.com>
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
most spark conf keys are case sensitive, including `spark.blockManager.port`, we can not get the correct port number with `spark.blockmanager.port`.
This PR changes the wrong key to `spark.blockManager.port` in `BasicExecutorFeatureStep`.
This PR also ensures a fast fail when the port value is invalid for executor containers. When 0 is specified(it is valid as random port, but invalid as a k8s request), it should not be put in the `containerPort` field of executor pod desc. We do not expect executor pods to continuously fail to create because of invalid requests.
### Why are the changes needed?
bugfix
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
new tests
Closes#32621 from yaooqinn/SPARK-35482.
Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
Change the type of `DATASET_ID_TAG` from `Long` to `HashSet[Long]` to allow the logical plan to match multiple datasets.
### Why are the changes needed?
During the transformation from one Dataset to another Dataset, the DATASET_ID_TAG of logical plan won't change if the plan itself doesn't change:
b5241c97b1/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala (L234-L237)
However, dataset id always changes even if the logical plan doesn't change:
b5241c97b1/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala (L207-L208)
And this can lead to the mismatch between dataset's id and col's __dataset_id. E.g.,
```scala
test("SPARK-28344: fail ambiguous self join - Dataset.colRegex as column ref") {
// The test can fail if we change it to:
// val df1 = spark.range(3).toDF()
// val df2 = df1.filter($"id" > 0).toDF()
val df1 = spark.range(3)
val df2 = df1.filter($"id" > 0)
withSQLConf(
SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED.key -> "true",
SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
assertAmbiguousSelfJoin(df1.join(df2, df1.colRegex("id") > df2.colRegex("id")))
}
}
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added unit tests.
Closes#32616 from Ngone51/fix-ambiguous-join.
Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>