dff73bfa5e
## What changes were proposed in this pull request? This PR improves `CollapseRepartition` to optimize the adjacent combinations of **Repartition** and **RepartitionBy**. Also, this PR adds a testsuite for this optimizer. **Target Scenario** ```scala scala> val dsView1 = spark.range(8).repartition(8, $"id") scala> dsView1.createOrReplaceTempView("dsView1") scala> sql("select id from dsView1 distribute by id").explain(true) ``` **Before** ```scala scala> sql("select id from dsView1 distribute by id").explain(true) == Parsed Logical Plan == 'RepartitionByExpression ['id] +- 'Project ['id] +- 'UnresolvedRelation `dsView1` == Analyzed Logical Plan == id: bigint RepartitionByExpression [id#0L] +- Project [id#0L] +- SubqueryAlias dsview1 +- RepartitionByExpression [id#0L], 8 +- Range (0, 8, splits=8) == Optimized Logical Plan == RepartitionByExpression [id#0L] +- RepartitionByExpression [id#0L], 8 +- Range (0, 8, splits=8) == Physical Plan == Exchange hashpartitioning(id#0L, 200) +- Exchange hashpartitioning(id#0L, 8) +- *Range (0, 8, splits=8) ``` **After** ```scala scala> sql("select id from dsView1 distribute by id").explain(true) == Parsed Logical Plan == 'RepartitionByExpression ['id] +- 'Project ['id] +- 'UnresolvedRelation `dsView1` == Analyzed Logical Plan == id: bigint RepartitionByExpression [id#0L] +- Project [id#0L] +- SubqueryAlias dsview1 +- RepartitionByExpression [id#0L], 8 +- Range (0, 8, splits=8) == Optimized Logical Plan == RepartitionByExpression [id#0L] +- Range (0, 8, splits=8) == Physical Plan == Exchange hashpartitioning(id#0L, 200) +- *Range (0, 8, splits=8) ``` ## How was this patch tested? Pass the Jenkins tests (including a new testsuite). Author: Dongjoon Hyun <dongjoon@apache.org> Closes #13765 from dongjoon-hyun/SPARK-16052. |
||
---|---|---|
.. | ||
ml | ||
mllib | ||
sql | ||
streaming | ||
__init__.py | ||
accumulators.py | ||
broadcast.py | ||
cloudpickle.py | ||
conf.py | ||
context.py | ||
daemon.py | ||
files.py | ||
heapq3.py | ||
java_gateway.py | ||
join.py | ||
profiler.py | ||
rdd.py | ||
rddsampler.py | ||
resultiterable.py | ||
serializers.py | ||
shell.py | ||
shuffle.py | ||
statcounter.py | ||
status.py | ||
storagelevel.py | ||
tests.py | ||
traceback_utils.py | ||
worker.py |