spark-instrumented-optimizer/sql/core
Terry Kim 1c781a4354 [SPARK-32282][SQL] Improve EnsureRquirement.reorderJoinKeys to handle more scenarios such as PartitioningCollection
### What changes were proposed in this pull request?

This PR proposes to improve  `EnsureRquirement.reorderJoinKeys` to handle the following scenarios:
1. If the keys cannot be reordered to match the left-side `HashPartitioning`, consider the right-side `HashPartitioning`.
2. Handle `PartitioningCollection`, which may contain `HashPartitioning`

### Why are the changes needed?

1. For the scenario 1), the current behavior matches either the left-side `HashPartitioning` or the right-side `HashPartitioning`. This means that if both sides are `HashPartitioning`, it will try to match only the left side.
The following will not consider the right-side `HashPartitioning`:
```
val df1 = (0 until 10).map(i => (i % 5, i % 13)).toDF("i1", "j1")
val df2 = (0 until 10).map(i => (i % 7, i % 11)).toDF("i2", "j2")
df1.write.format("parquet").bucketBy(4, "i1", "j1").saveAsTable("t1")df2.write.format("parquet").bucketBy(4, "i2", "j2").saveAsTable("t2")
val t1 = spark.table("t1")
val t2 = spark.table("t2")
val join = t1.join(t2, t1("i1") === t2("j2") && t1("i1") === t2("i2"))
 join.explain

== Physical Plan ==
*(5) SortMergeJoin [i1#26, i1#26], [j2#31, i2#30], Inner
:- *(2) Sort [i1#26 ASC NULLS FIRST, i1#26 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(i1#26, i1#26, 4), true, [id=#69]
:     +- *(1) Project [i1#26, j1#27]
:        +- *(1) Filter isnotnull(i1#26)
:           +- *(1) ColumnarToRow
:              +- FileScan parquet default.t1[i1#26,j1#27] Batched: true, DataFilters: [isnotnull(i1#26)], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [IsNotNull(i1)], ReadSchema: struct<i1:int,j1:int>, SelectedBucketsCount: 4 out of 4
+- *(4) Sort [j2#31 ASC NULLS FIRST, i2#30 ASC NULLS FIRST], false, 0.
   +- Exchange hashpartitioning(j2#31, i2#30, 4), true, [id=#79].       <===== This can be removed
      +- *(3) Project [i2#30, j2#31]
         +- *(3) Filter (((j2#31 = i2#30) AND isnotnull(j2#31)) AND isnotnull(i2#30))
            +- *(3) ColumnarToRow
               +- FileScan parquet default.t2[i2#30,j2#31] Batched: true, DataFilters: [(j2#31 = i2#30), isnotnull(j2#31), isnotnull(i2#30)], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [IsNotNull(j2), IsNotNull(i2)], ReadSchema: struct<i2:int,j2:int>, SelectedBucketsCount: 4 out of 4

```

2.  For the scenario 2), the current behavior does not handle `PartitioningCollection`:
```
val df1 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i1", "j1")
val df2 = (0 until 100).map(i => (i % 7, i % 11)).toDF("i2", "j2")
val df3 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i3", "j3")
val join = df1.join(df2, df1("i1") === df2("i2") && df1("j1") === df2("j2")) // PartitioningCollection
val join2 = join.join(df3, join("j1") === df3("j3") && join("i1") === df3("i3"))
join2.explain

== Physical Plan ==
*(9) SortMergeJoin [j1#8, i1#7], [j3#30, i3#29], Inner
:- *(6) Sort [j1#8 ASC NULLS FIRST, i1#7 ASC NULLS FIRST], false, 0.       <===== This can be removed
:  +- Exchange hashpartitioning(j1#8, i1#7, 5), true, [id=#58]             <===== This can be removed
:     +- *(5) SortMergeJoin [i1#7, j1#8], [i2#18, j2#19], Inner
:        :- *(2) Sort [i1#7 ASC NULLS FIRST, j1#8 ASC NULLS FIRST], false, 0
:        :  +- Exchange hashpartitioning(i1#7, j1#8, 5), true, [id=#45]
:        :     +- *(1) Project [_1#2 AS i1#7, _2#3 AS j1#8]
:        :        +- *(1) LocalTableScan [_1#2, _2#3]
:        +- *(4) Sort [i2#18 ASC NULLS FIRST, j2#19 ASC NULLS FIRST], false, 0
:           +- Exchange hashpartitioning(i2#18, j2#19, 5), true, [id=#51]
:              +- *(3) Project [_1#13 AS i2#18, _2#14 AS j2#19]
:                 +- *(3) LocalTableScan [_1#13, _2#14]
+- *(8) Sort [j3#30 ASC NULLS FIRST, i3#29 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(j3#30, i3#29, 5), true, [id=#64]
      +- *(7) Project [_1#24 AS i3#29, _2#25 AS j3#30]
         +- *(7) LocalTableScan [_1#24, _2#25]
```
### Does this PR introduce _any_ user-facing change?

Yes, now from the above examples, the shuffle/sort nodes pointed by `This can be removed` are now removed:
1. Senario 1):
```
== Physical Plan ==
*(4) SortMergeJoin [i1#26, i1#26], [i2#30, j2#31], Inner
:- *(2) Sort [i1#26 ASC NULLS FIRST, i1#26 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(i1#26, i1#26, 4), true, [id=#67]
:     +- *(1) Project [i1#26, j1#27]
:        +- *(1) Filter isnotnull(i1#26)
:           +- *(1) ColumnarToRow
:              +- FileScan parquet default.t1[i1#26,j1#27] Batched: true, DataFilters: [isnotnull(i1#26)], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [IsNotNull(i1)], ReadSchema: struct<i1:int,j1:int>, SelectedBucketsCount: 4 out of 4
+- *(3) Sort [i2#30 ASC NULLS FIRST, j2#31 ASC NULLS FIRST], false, 0
   +- *(3) Project [i2#30, j2#31]
      +- *(3) Filter (((j2#31 = i2#30) AND isnotnull(j2#31)) AND isnotnull(i2#30))
         +- *(3) ColumnarToRow
            +- FileScan parquet default.t2[i2#30,j2#31] Batched: true, DataFilters: [(j2#31 = i2#30), isnotnull(j2#31), isnotnull(i2#30)], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [IsNotNull(j2), IsNotNull(i2)], ReadSchema: struct<i2:int,j2:int>, SelectedBucketsCount: 4 out of 4
```
2. Scenario 2):
```
== Physical Plan ==
*(8) SortMergeJoin [i1#7, j1#8], [i3#29, j3#30], Inner
:- *(5) SortMergeJoin [i1#7, j1#8], [i2#18, j2#19], Inner
:  :- *(2) Sort [i1#7 ASC NULLS FIRST, j1#8 ASC NULLS FIRST], false, 0
:  :  +- Exchange hashpartitioning(i1#7, j1#8, 5), true, [id=#43]
:  :     +- *(1) Project [_1#2 AS i1#7, _2#3 AS j1#8]
:  :        +- *(1) LocalTableScan [_1#2, _2#3]
:  +- *(4) Sort [i2#18 ASC NULLS FIRST, j2#19 ASC NULLS FIRST], false, 0
:     +- Exchange hashpartitioning(i2#18, j2#19, 5), true, [id=#49]
:        +- *(3) Project [_1#13 AS i2#18, _2#14 AS j2#19]
:           +- *(3) LocalTableScan [_1#13, _2#14]
+- *(7) Sort [i3#29 ASC NULLS FIRST, j3#30 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(i3#29, j3#30, 5), true, [id=#58]
      +- *(6) Project [_1#24 AS i3#29, _2#25 AS j3#30]
         +- *(6) LocalTableScan [_1#24, _2#25]
```

### How was this patch tested?

Added tests.

Closes #29074 from imback82/reorder_keys.

Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-10-08 04:58:41 +00:00
..
benchmarks [SPARK-32511][SQL] Add dropFields method to Column class 2020-10-06 08:53:30 +00:00
src [SPARK-32282][SQL] Improve EnsureRquirement.reorderJoinKeys to handle more scenarios such as PartitioningCollection 2020-10-08 04:58:41 +00:00
pom.xml [SPARK-20202][BUILD][SQL] Remove references to org.spark-project.hive (Hive 1.2.1) 2020-10-05 15:29:56 -07:00