spark-instrumented-optimizer/python
Hyukjin Kwon 26ae9e93da [SPARK-36559][SQL][PYTHON] Create plans dedicated to distributed-sequence index for optimization
### What changes were proposed in this pull request?

This PR proposes to move distributed-sequence index implementation to SQL plan to leverage optimizations such as column pruning.

```python
import pyspark.pandas as ps
ps.set_option('compute.default_index_type', 'distributed-sequence')
ps.range(10).id.value_counts().to_frame().spark.explain()
```

**Before:**

```bash
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count#51L DESC NULLS LAST], true, 0
   +- Exchange rangepartitioning(count#51L DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [id=#70]
      +- HashAggregate(keys=[id#37L], functions=[count(1)], output=[__index_level_0__#48L, count#51L])
         +- Exchange hashpartitioning(id#37L, 200), ENSURE_REQUIREMENTS, [id=#67]
            +- HashAggregate(keys=[id#37L], functions=[partial_count(1)], output=[id#37L, count#63L])
               +- Project [id#37L]
                  +- Filter atleastnnonnulls(1, id#37L)
                     +- Scan ExistingRDD[__index_level_0__#36L,id#37L]
                        # ^^^ Base DataFrame created by the output RDD from zipWithIndex (and checkpointed)
```

**After:**

```bash
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count#275L DESC NULLS LAST], true, 0
   +- Exchange rangepartitioning(count#275L DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [id=#174]
      +- HashAggregate(keys=[id#258L], functions=[count(1)])
         +- HashAggregate(keys=[id#258L], functions=[partial_count(1)])
            +- Filter atleastnnonnulls(1, id#258L)
               +- Range (0, 10, step=1, splits=16)
                  # ^^^ Removed the Spark job execution for `zipWithIndex`
```

### Why are the changes needed?

To leverage optimization of SQL engine and avoid unnecessary shuffle to create default index.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unittests were added. Also, this PR will test all unittests in pandas API on Spark after switching the default index implementation to `distributed-sequence`.

Closes #33807 from HyukjinKwon/SPARK-36559.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 93cec49212)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2021-08-25 10:03:00 +09:00
..
docs Revert "[SPARK-34415][ML] Randomization in hyperparameter optimization" 2021-08-24 13:39:29 -07:00
lib [SPARK-34688][PYTHON] Upgrade to Py4J 0.10.9.2 2021-03-11 09:51:41 -06:00
pyspark [SPARK-36559][SQL][PYTHON] Create plans dedicated to distributed-sequence index for optimization 2021-08-25 10:03:00 +09:00
test_coverage [SPARK-36092][INFRA][BUILD][PYTHON] Migrate to GitHub Actions with Codecov from Jenkins 2021-08-01 21:38:39 +09:00
test_support Spelling r common dev mlib external project streaming resource managers python 2020-11-27 10:22:45 -06:00
.coveragerc [SPARK-7721][PYTHON][TESTS] Adds PySpark coverage generation script 2018-01-22 22:12:50 +09:00
.gitignore [SPARK-3946] gitignore in /python includes wrong directory 2014-10-14 14:09:39 -07:00
MANIFEST.in [SPARK-32714][PYTHON] Initial pyspark-stubs port 2020-09-24 14:15:36 +09:00
mypy.ini [SPARK-35684][INFRA][PYTHON] Bump up mypy version in GitHub Actions 2021-07-07 13:26:41 +09:00
pylintrc [SPARK-32435][PYTHON] Remove heapq3 port from Python 3 2020-07-27 20:10:13 +09:00
README.md [SPARK-30884][PYSPARK] Upgrade to Py4J 0.10.9 2020-02-20 09:09:30 -08:00
run-tests [SPARK-29672][PYSPARK] update spark testing framework to use python3 2019-11-14 10:18:55 -08:00
run-tests-with-coverage [SPARK-36092][INFRA][BUILD][PYTHON] Migrate to GitHub Actions with Codecov from Jenkins 2021-08-01 21:38:39 +09:00
run-tests.py [SPARK-32194][PYTHON] Use proper exception classes instead of plain Exception 2021-05-26 11:54:40 +09:00
setup.cfg [SPARK-1267][SPARK-18129] Allow PySpark to be pip installed 2016-11-16 14:22:15 -08:00
setup.py [SPARK-35759][PYTHON] Remove the upperbound for numpy for pandas-on-Spark 2021-06-15 09:59:05 +09:00

Apache Spark

Spark is a unified analytics engine for large-scale data processing. It provides high-level APIs in Scala, Java, Python, and R, and an optimized engine that supports general computation graphs for data analysis. It also supports a rich set of higher-level tools including Spark SQL for SQL and DataFrames, MLlib for machine learning, GraphX for graph processing, and Structured Streaming for stream processing.

https://spark.apache.org/

Online Documentation

You can find the latest Spark documentation, including a programming guide, on the project web page

Python Packaging

This README file only contains basic information related to pip installed PySpark. This packaging is currently experimental and may change in future versions (although we will do our best to keep compatibility). Using PySpark requires the Spark JARs, and if you are building this from source please see the builder instructions at "Building Spark".

The Python packaging for Spark is not intended to replace all of the other use cases. This Python packaged version of Spark is suitable for interacting with an existing cluster (be it Spark standalone, YARN, or Mesos) - but does not contain the tools required to set up your own standalone Spark cluster. You can download the full version of Spark from the Apache Spark downloads page.

NOTE: If you are using this with a Spark standalone cluster you must ensure that the version (including minor version) matches or you may experience odd errors.

Python Requirements

At its core PySpark depends on Py4J, but some additional sub-packages have their own extra requirements for some features (including numpy, pandas, and pyarrow).