spark-instrumented-optimizer/sql/core
Kris Mok 56448c6623 [SPARK-26352][SQL] join reorder should not change the order of output attributes
## What changes were proposed in this pull request?

The optimizer rule `org.apache.spark.sql.catalyst.optimizer.ReorderJoin` performs join reordering on inner joins. This was introduced from SPARK-12032 (https://github.com/apache/spark/pull/10073) in 2015-12.

After it had reordered the joins, though, it didn't check whether or not the output attribute order is still the same as before. Thus, it's possible to have a mismatch between the reordered output attributes order vs the schema that a DataFrame thinks it has.
The same problem exists in the CBO version of join reordering (`CostBasedJoinReorder`) too.

This can be demonstrated with the example:
```scala
spark.sql("create table table_a (x int, y int) using parquet")
spark.sql("create table table_b (i int, j int) using parquet")
spark.sql("create table table_c (a int, b int) using parquet")
val df = spark.sql("""
  with df1 as (select * from table_a cross join table_b)
  select * from df1 join table_c on a = x and b = i
""")
```
here's what the DataFrame thinks:
```
scala> df.printSchema
root
 |-- x: integer (nullable = true)
 |-- y: integer (nullable = true)
 |-- i: integer (nullable = true)
 |-- j: integer (nullable = true)
 |-- a: integer (nullable = true)
 |-- b: integer (nullable = true)
```
here's what the optimized plan thinks, after join reordering:
```
scala> df.queryExecution.optimizedPlan.output.foreach(a => println(s"|-- ${a.name}: ${a.dataType.typeName}"))
|-- x: integer
|-- y: integer
|-- a: integer
|-- b: integer
|-- i: integer
|-- j: integer
```

If we exclude the `ReorderJoin` rule (using Spark 2.4's optimizer rule exclusion feature), it's back to normal:
```
scala> spark.conf.set("spark.sql.optimizer.excludedRules", "org.apache.spark.sql.catalyst.optimizer.ReorderJoin")

scala> val df = spark.sql("with df1 as (select * from table_a cross join table_b) select * from df1 join table_c on a = x and b = i")
df: org.apache.spark.sql.DataFrame = [x: int, y: int ... 4 more fields]

scala> df.queryExecution.optimizedPlan.output.foreach(a => println(s"|-- ${a.name}: ${a.dataType.typeName}"))
|-- x: integer
|-- y: integer
|-- i: integer
|-- j: integer
|-- a: integer
|-- b: integer
```

Note that this output attribute ordering problem leads to data corruption, and can manifest itself in various symptoms:
* Silently corrupting data, if the reordered columns happen to either have matching types or have sufficiently-compatible types (e.g. all fixed length primitive types are considered as "sufficiently compatible" in an `UnsafeRow`), then only the resulting data is going to be wrong but it might not trigger any alarms immediately. Or
* Weird Java-level exceptions like `java.lang.NegativeArraySizeException`, or even SIGSEGVs.

## How was this patch tested?

Added new unit test in `JoinReorderSuite` and new end-to-end test in `JoinSuite`.
Also made `JoinReorderSuite` and `StarJoinReorderSuite` assert more strongly on maintaining output attribute order.

Closes #23303 from rednaxelafx/fix-join-reorder.

Authored-by: Kris Mok <rednaxelafx@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2018-12-17 13:41:20 +08:00
..
benchmarks [SPARK-26337][SQL][TEST] Add benchmark for LongToUnsafeRowMap 2018-12-14 10:50:48 +08:00
src [SPARK-26352][SQL] join reorder should not change the order of output attributes 2018-12-17 13:41:20 +08:00
pom.xml [SPARK-25956] Make Scala 2.12 as default Scala version in Spark 3.0 2018-11-14 16:22:23 -08:00