[SPARK-15247][SQL] Set the default number of partitions for reading parquet schemas
## What changes were proposed in this pull request? This pr sets the default number of partitions when reading parquet schemas. SQLContext#read#parquet currently yields at least n_executors * n_cores tasks even if parquet data consist of a single small file. This issue could increase the latency for small jobs. ## How was this patch tested? Manually tested and checked. Author: Takeshi YAMAMURO <linguin.m.s@gmail.com> Closes #13137 from maropu/SPARK-15247.
This commit is contained in:
parent
bd39ffe35c
commit
dae4d5db21
|
@ -794,11 +794,16 @@ private[sql] object ParquetFileFormat extends Logging {
|
|||
// side, and resemble fake `FileStatus`es there.
|
||||
val partialFileStatusInfo = filesToTouch.map(f => (f.getPath.toString, f.getLen))
|
||||
|
||||
// Set the number of partitions to prevent following schema reads from generating many tasks
|
||||
// in case of a small number of parquet files.
|
||||
val numParallelism = Math.min(Math.max(partialFileStatusInfo.size, 1),
|
||||
sparkSession.sparkContext.defaultParallelism)
|
||||
|
||||
// Issues a Spark job to read Parquet schema in parallel.
|
||||
val partiallyMergedSchemas =
|
||||
sparkSession
|
||||
.sparkContext
|
||||
.parallelize(partialFileStatusInfo)
|
||||
.parallelize(partialFileStatusInfo, numParallelism)
|
||||
.mapPartitions { iterator =>
|
||||
// Resembles fake `FileStatus`es with serialized path and length information.
|
||||
val fakeFileStatuses = iterator.map { case (path, length) =>
|
||||
|
|
Loading…
Reference in a new issue