## What changes were proposed in this pull request?
### What's problem?
In some cases, sub scalar query could throw a NPE, which is caused in execution side.
```
java.lang.NullPointerException
at org.apache.spark.sql.execution.FileSourceScanExec.<init>(DataSourceScanExec.scala:169)
at org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:526)
at org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:159)
at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:211)
at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:210)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$3.apply(QueryPlan.scala:225)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$3.apply(QueryPlan.scala:225)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:296)
at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:225)
at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:211)
at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:210)
at org.apache.spark.sql.catalyst.plans.QueryPlan.sameResult(QueryPlan.scala:258)
at org.apache.spark.sql.execution.ScalarSubquery.semanticEquals(subquery.scala:58)
at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$Expr.equals(EquivalentExpressions.scala:36)
at scala.collection.mutable.HashTable$class.elemEquals(HashTable.scala:364)
at scala.collection.mutable.HashMap.elemEquals(HashMap.scala:40)
at scala.collection.mutable.HashTable$class.scala$collection$mutable$HashTable$$findEntry0(HashTable.scala:139)
at scala.collection.mutable.HashTable$class.findEntry(HashTable.scala:135)
at scala.collection.mutable.HashMap.findEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.get(HashMap.scala:70)
at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExpr(EquivalentExpressions.scala:56)
at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:97)
at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$$anonfun$addExprTree$1.apply(EquivalentExpressions.scala:98)
at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$$anonfun$addExprTree$1.apply(EquivalentExpressions.scala:98)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:98)
at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$subexpressionElimination$1.apply(CodeGenerator.scala:1102)
at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$subexpressionElimination$1.apply(CodeGenerator.scala:1102)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.subexpressionElimination(CodeGenerator.scala:1102)
at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.generateExpressions(CodeGenerator.scala:1154)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.createCode(GenerateUnsafeProjection.scala:270)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:319)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.generate(GenerateUnsafeProjection.scala:308)
at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:181)
at org.apache.spark.sql.execution.ProjectExec$$anonfun$9.apply(basicPhysicalOperators.scala:71)
at org.apache.spark.sql.execution.ProjectExec$$anonfun$9.apply(basicPhysicalOperators.scala:70)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:818)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:818)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:367)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```
### How does this happen?
Here looks what happen now:
1. Sub scalar query was made (for instance `SELECT (SELECT id FROM foo)`).
2. Try to extract some common expressions (via `CodeGenerator.subexpressionElimination`) so that it can generates some common codes and can be reused.
3. During this, seems it extracts some expressions that can be reused (via `EquivalentExpressions.addExprTree`)
b2deef64f6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala (L1102)
4. During this, if the hash (`EquivalentExpressions.Expr.hashCode`) happened to be the same at `EquivalentExpressions.addExpr` anyhow, `EquivalentExpressions.Expr.equals` is called to identify object in the same hash, which eventually calls `semanticEquals` in `ScalarSubquery`
087879a77a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala (L54)087879a77a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala (L36)
5. `ScalarSubquery`'s `semanticEquals` needs `SubqueryExec`'s `sameResult`
77a2fc5b52/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala (L58)
6. `SubqueryExec`'s `sameResult` requires a canonicalized plan which calls `FileSourceScanExec`'s `doCanonicalize`
e008ad1752/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala (L258)
7. In `FileSourceScanExec`'s `doCanonicalize`, `FileSourceScanExec`'s `relation` is required but seems `transient` so it becomes `null`.
e76b0124fb/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala (L527)e76b0124fb/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala (L160)
8. NPE is thrown.
\*1. driver side
\*2., 3., 4., 5., 6., 7., 8. executor side
Note that most of cases, it looks fine because we will usually call:
087879a77a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala (L40)
which make a canonicalized plan via:
b045315e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala (L192)77a2fc5b52/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala (L52)
### How to reproduce?
This looks what happened now. I can reproduce this by a bit of messy way:
```diff
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
index 8d06804ce1e..d25fc9a7ba9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
-37,7 +37,9 class EquivalentExpressions {
case _ => false
}
- override def hashCode: Int = e.semanticHash()
+ override def hashCode: Int = {
+ 1
+ }
}
```
```scala
spark.range(1).write.mode("overwrite").parquet("/tmp/foo")
spark.read.parquet("/tmp/foo").createOrReplaceTempView("foo")
spark.conf.set("spark.sql.codegen.wholeStage", false)
sql("SELECT (SELECT id FROM foo) == (SELECT id FROM foo)").collect()
```
### How does this PR fix?
- Make all variables that access to `FileSourceScanExec`'s `relation` as `lazy val` so that we avoid NPE. This is a temporary fix.
- Allow `makeCopy` in `SparkPlan` without Spark session too. This looks still able to be accessed within executor side. For instance:
```
at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:70)
at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:47)
at org.apache.spark.sql.catalyst.trees.TreeNode.withNewChildren(TreeNode.scala:233)
at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:243)
at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:211)
at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:210)
at org.apache.spark.sql.catalyst.plans.QueryPlan.sameResult(QueryPlan.scala:258)
at org.apache.spark.sql.execution.ScalarSubquery.semanticEquals(subquery.scala:58)
at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$Expr.equals(EquivalentExpressions.scala:36)
at scala.collection.mutable.HashTable$class.elemEquals(HashTable.scala:364)
at scala.collection.mutable.HashMap.elemEquals(HashMap.scala:40)
at scala.collection.mutable.HashTable$class.scala$collection$mutable$HashTable$$findEntry0(HashTable.scala:139)
at scala.collection.mutable.HashTable$class.findEntry(HashTable.scala:135)
at scala.collection.mutable.HashMap.findEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.get(HashMap.scala:70)
at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExpr(EquivalentExpressions.scala:54)
at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:95)
at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$$anonfun$addExprTree$1.apply(EquivalentExpressions.scala:96)
at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$$anonfun$addExprTree$1.apply(EquivalentExpressions.scala:96)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:96)
at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$subexpressionElimination$1.apply(CodeGenerator.scala:1102)
at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$subexpressionElimination$1.apply(CodeGenerator.scala:1102)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.subexpressionElimination(CodeGenerator.scala:1102)
at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.generateExpressions(CodeGenerator.scala:1154)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.createCode(GenerateUnsafeProjection.scala:270)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:319)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.generate(GenerateUnsafeProjection.scala:308)
at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:181)
at org.apache.spark.sql.execution.ProjectExec$$anonfun$9.apply(basicPhysicalOperators.scala:71)
at org.apache.spark.sql.execution.ProjectExec$$anonfun$9.apply(basicPhysicalOperators.scala:70)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:818)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:818)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:367)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```
This PR takes over https://github.com/apache/spark/pull/20856.
## How was this patch tested?
Manually tested and unit test was added.
Closes#20856
Author: hyukjinkwon <gurwls223@apache.org>
Closes#21815 from HyukjinKwon/SPARK-23731.