[SQL] When creating partitioned table scan, explicitly create UnionRDD.
Otherwise, it will cause stack overflow when there are many partitions.
Author: Yin Huai <yhuai@databricks.com>
Closes #6162 from yhuai/partitionUnionedRDD and squashes the following commits:
fa016d8 [Yin Huai] Explicitly create UnionRDD.
(cherry picked from commit e8f0e016ea
)
Signed-off-by: Cheng Lian <lian@databricks.com>
This commit is contained in:
parent
bac45229aa
commit
7aa269f4bb
|
@ -21,7 +21,7 @@ import org.apache.hadoop.fs.Path
|
||||||
|
|
||||||
import org.apache.spark.Logging
|
import org.apache.spark.Logging
|
||||||
import org.apache.spark.deploy.SparkHadoopUtil
|
import org.apache.spark.deploy.SparkHadoopUtil
|
||||||
import org.apache.spark.rdd.RDD
|
import org.apache.spark.rdd.{UnionRDD, RDD}
|
||||||
import org.apache.spark.sql.Row
|
import org.apache.spark.sql.Row
|
||||||
import org.apache.spark.sql.catalyst.expressions
|
import org.apache.spark.sql.catalyst.expressions
|
||||||
import org.apache.spark.sql.catalyst.expressions._
|
import org.apache.spark.sql.catalyst.expressions._
|
||||||
|
@ -169,9 +169,12 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
|
||||||
scan.execute()
|
scan.execute()
|
||||||
}
|
}
|
||||||
|
|
||||||
val unionedRows = perPartitionRows.reduceOption(_ ++ _).getOrElse {
|
val unionedRows =
|
||||||
relation.sqlContext.emptyResult
|
if (perPartitionRows.length == 0) {
|
||||||
}
|
relation.sqlContext.emptyResult
|
||||||
|
} else {
|
||||||
|
new UnionRDD(relation.sqlContext.sparkContext, perPartitionRows)
|
||||||
|
}
|
||||||
|
|
||||||
createPhysicalRDD(logicalRelation.relation, output, unionedRows)
|
createPhysicalRDD(logicalRelation.relation, output, unionedRows)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue