[SPARK-29454][SQL] Reduce unsafeProjection times when read Parquet file use non-vectorized mode

### What changes were proposed in this pull request?

There will be 2 times unsafeProjection convert operation When we read a Parquet data file use non-vectorized mode:

1.  `ParquetGroupConverter` call unsafeProjection function to covert `SpecificInternalRow` to `UnsafeRow` every times when read Parquet data file use `ParquetRecordReader`.

2. `ParquetFileFormat` will call unsafeProjection function to covert this `UnsafeRow` to another `UnsafeRow` again when partitionSchema is not empty in DataSourceV1 branch, and `PartitionReaderWithPartitionValues` will  always do this convert operation in DataSourceV2 branch.

In this pr,  remove `unsafeProjection` convert operation in `ParquetGroupConverter` and change `ParquetRecordReader`  to produce `SpecificInternalRow`  instead of `UnsafeRow`.

### Why are the changes needed?
The first time convert in `ParquetGroupConverter` is redundant and `ParquetRecordReader` return a `InternalRow(SpecificInternalRow)` is enough.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?

Unit Test

Closes #26106 from LuciferYang/spark-parquet-unsafe-projection.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
yangjie01 2019-10-15 12:42:42 +08:00 committed by Wenchen Fan
parent 857f109c47
commit a988aaf3fa
5 changed files with 23 additions and 30 deletions

View file

@ -328,32 +328,28 @@ class ParquetFileFormat
iter.asInstanceOf[Iterator[InternalRow]]
} else {
logDebug(s"Falling back to parquet-mr")
// ParquetRecordReader returns UnsafeRow
// ParquetRecordReader returns InternalRow
val readSupport = new ParquetReadSupport(convertTz, enableVectorizedReader = false)
val reader = if (pushed.isDefined && enableRecordFilter) {
val parquetFilter = FilterCompat.get(pushed.get, null)
new ParquetRecordReader[UnsafeRow](readSupport, parquetFilter)
new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
} else {
new ParquetRecordReader[UnsafeRow](readSupport)
new ParquetRecordReader[InternalRow](readSupport)
}
val iter = new RecordReaderIterator(reader)
val iter = new RecordReaderIterator[InternalRow](reader)
// SPARK-23457 Register a task completion listener before `initialization`.
taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
reader.initialize(split, hadoopAttemptContext)
val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
val joinedRow = new JoinedRow()
val appendPartitionColumns = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
// This is a horrible erasure hack... if we type the iterator above, then it actually check
// the type in next() and we get a class cast exception. If we make that function return
// Object, then we can defer the cast until later!
if (partitionSchema.length == 0) {
// There is no partition columns
iter.asInstanceOf[Iterator[InternalRow]]
iter.map(unsafeProjection)
} else {
iter.asInstanceOf[Iterator[InternalRow]]
.map(d => appendPartitionColumns(joinedRow(d, file.partitionValues)))
val joinedRow = new JoinedRow()
iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues)))
}
}
}

View file

@ -29,13 +29,13 @@ import org.apache.parquet.schema._
import org.apache.parquet.schema.Type.Repetition
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
/**
* A Parquet [[ReadSupport]] implementation for reading Parquet records as Catalyst
* [[UnsafeRow]]s.
* [[InternalRow]]s.
*
* The API interface of [[ReadSupport]] is a little bit over complicated because of historical
* reasons. In older versions of parquet-mr (say 1.6.0rc3 and prior), [[ReadSupport]] need to be
@ -51,7 +51,7 @@ import org.apache.spark.sql.types._
*/
class ParquetReadSupport(val convertTz: Option[TimeZone],
enableVectorizedReader: Boolean)
extends ReadSupport[UnsafeRow] with Logging {
extends ReadSupport[InternalRow] with Logging {
private var catalystRequestedSchema: StructType = _
def this() {
@ -114,13 +114,13 @@ class ParquetReadSupport(val convertTz: Option[TimeZone],
/**
* Called on executor side after [[init()]], before instantiating actual Parquet record readers.
* Responsible for instantiating [[RecordMaterializer]], which is used for converting Parquet
* records to Catalyst [[UnsafeRow]]s.
* records to Catalyst [[InternalRow]]s.
*/
override def prepareForRead(
conf: Configuration,
keyValueMetaData: JMap[String, String],
fileSchema: MessageType,
readContext: ReadContext): RecordMaterializer[UnsafeRow] = {
readContext: ReadContext): RecordMaterializer[InternalRow] = {
val parquetRequestedSchema = readContext.getRequestedSchema
new ParquetRecordMaterializer(
parquetRequestedSchema,

View file

@ -22,7 +22,7 @@ import java.util.TimeZone
import org.apache.parquet.io.api.{GroupConverter, RecordMaterializer}
import org.apache.parquet.schema.MessageType
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.StructType
/**
@ -37,12 +37,12 @@ private[parquet] class ParquetRecordMaterializer(
catalystSchema: StructType,
schemaConverter: ParquetToSparkSchemaConverter,
convertTz: Option[TimeZone])
extends RecordMaterializer[UnsafeRow] {
extends RecordMaterializer[InternalRow] {
private val rootConverter =
new ParquetRowConverter(schemaConverter, parquetSchema, catalystSchema, convertTz, NoopUpdater)
override def getCurrentRecord: UnsafeRow = rootConverter.currentRecord
override def getCurrentRecord: InternalRow = rootConverter.currentRecord
override def getRootConverter: GroupConverter = rootConverter
}

View file

@ -173,12 +173,10 @@ private[parquet] class ParquetRowConverter(
private val currentRow = new SpecificInternalRow(catalystType.map(_.dataType))
private val unsafeProjection = UnsafeProjection.create(catalystType)
/**
* The [[UnsafeRow]] converted from an entire Parquet record.
* The [[InternalRow]] converted from an entire Parquet record.
*/
def currentRecord: UnsafeRow = unsafeProjection(currentRow)
def currentRecord: InternalRow = currentRow
// Converters for each field.
private val fieldConverters: Array[Converter with HasParentContainerUpdater] = {

View file

@ -31,7 +31,6 @@ import org.apache.spark.TaskContext
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader}
import org.apache.spark.sql.execution.datasources.{PartitionedFile, RecordReaderIterator}
@ -176,7 +175,7 @@ case class ParquetPartitionReaderFactory(
reader
}
private def createRowBaseReader(file: PartitionedFile): RecordReader[Void, UnsafeRow] = {
private def createRowBaseReader(file: PartitionedFile): RecordReader[Void, InternalRow] = {
buildReaderBase(file, createRowBaseParquetReader)
}
@ -185,16 +184,16 @@ case class ParquetPartitionReaderFactory(
partitionValues: InternalRow,
hadoopAttemptContext: TaskAttemptContextImpl,
pushed: Option[FilterPredicate],
convertTz: Option[TimeZone]): RecordReader[Void, UnsafeRow] = {
convertTz: Option[TimeZone]): RecordReader[Void, InternalRow] = {
logDebug(s"Falling back to parquet-mr")
val taskContext = Option(TaskContext.get())
// ParquetRecordReader returns UnsafeRow
// ParquetRecordReader returns InternalRow
val readSupport = new ParquetReadSupport(convertTz, enableVectorizedReader = false)
val reader = if (pushed.isDefined && enableRecordFilter) {
val parquetFilter = FilterCompat.get(pushed.get, null)
new ParquetRecordReader[UnsafeRow](readSupport, parquetFilter)
new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
} else {
new ParquetRecordReader[UnsafeRow](readSupport)
new ParquetRecordReader[InternalRow](readSupport)
}
val iter = new RecordReaderIterator(reader)
// SPARK-23457 Register a task completion listener before `initialization`.