diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 815b62dfbf..fb5daa4bfa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -328,32 +328,28 @@ class ParquetFileFormat iter.asInstanceOf[Iterator[InternalRow]] } else { logDebug(s"Falling back to parquet-mr") - // ParquetRecordReader returns UnsafeRow + // ParquetRecordReader returns InternalRow val readSupport = new ParquetReadSupport(convertTz, enableVectorizedReader = false) val reader = if (pushed.isDefined && enableRecordFilter) { val parquetFilter = FilterCompat.get(pushed.get, null) - new ParquetRecordReader[UnsafeRow](readSupport, parquetFilter) + new ParquetRecordReader[InternalRow](readSupport, parquetFilter) } else { - new ParquetRecordReader[UnsafeRow](readSupport) + new ParquetRecordReader[InternalRow](readSupport) } - val iter = new RecordReaderIterator(reader) + val iter = new RecordReaderIterator[InternalRow](reader) // SPARK-23457 Register a task completion listener before `initialization`. taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close())) reader.initialize(split, hadoopAttemptContext) val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes - val joinedRow = new JoinedRow() - val appendPartitionColumns = GenerateUnsafeProjection.generate(fullSchema, fullSchema) + val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema) - // This is a horrible erasure hack... if we type the iterator above, then it actually check - // the type in next() and we get a class cast exception. If we make that function return - // Object, then we can defer the cast until later! if (partitionSchema.length == 0) { // There is no partition columns - iter.asInstanceOf[Iterator[InternalRow]] + iter.map(unsafeProjection) } else { - iter.asInstanceOf[Iterator[InternalRow]] - .map(d => appendPartitionColumns(joinedRow(d, file.partitionValues))) + val joinedRow = new JoinedRow() + iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues))) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala index 2c7231d2c3..69c8bad5f1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala @@ -29,13 +29,13 @@ import org.apache.parquet.schema._ import org.apache.parquet.schema.Type.Repetition import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ /** * A Parquet [[ReadSupport]] implementation for reading Parquet records as Catalyst - * [[UnsafeRow]]s. + * [[InternalRow]]s. * * The API interface of [[ReadSupport]] is a little bit over complicated because of historical * reasons. In older versions of parquet-mr (say 1.6.0rc3 and prior), [[ReadSupport]] need to be @@ -51,7 +51,7 @@ import org.apache.spark.sql.types._ */ class ParquetReadSupport(val convertTz: Option[TimeZone], enableVectorizedReader: Boolean) - extends ReadSupport[UnsafeRow] with Logging { + extends ReadSupport[InternalRow] with Logging { private var catalystRequestedSchema: StructType = _ def this() { @@ -114,13 +114,13 @@ class ParquetReadSupport(val convertTz: Option[TimeZone], /** * Called on executor side after [[init()]], before instantiating actual Parquet record readers. * Responsible for instantiating [[RecordMaterializer]], which is used for converting Parquet - * records to Catalyst [[UnsafeRow]]s. + * records to Catalyst [[InternalRow]]s. */ override def prepareForRead( conf: Configuration, keyValueMetaData: JMap[String, String], fileSchema: MessageType, - readContext: ReadContext): RecordMaterializer[UnsafeRow] = { + readContext: ReadContext): RecordMaterializer[InternalRow] = { val parquetRequestedSchema = readContext.getRequestedSchema new ParquetRecordMaterializer( parquetRequestedSchema, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala index b2459dd0e8..3098a332d3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala @@ -22,7 +22,7 @@ import java.util.TimeZone import org.apache.parquet.io.api.{GroupConverter, RecordMaterializer} import org.apache.parquet.schema.MessageType -import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.types.StructType /** @@ -37,12 +37,12 @@ private[parquet] class ParquetRecordMaterializer( catalystSchema: StructType, schemaConverter: ParquetToSparkSchemaConverter, convertTz: Option[TimeZone]) - extends RecordMaterializer[UnsafeRow] { + extends RecordMaterializer[InternalRow] { private val rootConverter = new ParquetRowConverter(schemaConverter, parquetSchema, catalystSchema, convertTz, NoopUpdater) - override def getCurrentRecord: UnsafeRow = rootConverter.currentRecord + override def getCurrentRecord: InternalRow = rootConverter.currentRecord override def getRootConverter: GroupConverter = rootConverter } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index b772b6b77d..ff5c724375 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -173,12 +173,10 @@ private[parquet] class ParquetRowConverter( private val currentRow = new SpecificInternalRow(catalystType.map(_.dataType)) - private val unsafeProjection = UnsafeProjection.create(catalystType) - /** - * The [[UnsafeRow]] converted from an entire Parquet record. + * The [[InternalRow]] converted from an entire Parquet record. */ - def currentRecord: UnsafeRow = unsafeProjection(currentRow) + def currentRecord: InternalRow = currentRow // Converters for each field. private val fieldConverters: Array[Converter with HasParentContainerUpdater] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala index ded145f7de..b2fc724057 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala @@ -31,7 +31,6 @@ import org.apache.spark.TaskContext import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader} import org.apache.spark.sql.execution.datasources.{PartitionedFile, RecordReaderIterator} @@ -176,7 +175,7 @@ case class ParquetPartitionReaderFactory( reader } - private def createRowBaseReader(file: PartitionedFile): RecordReader[Void, UnsafeRow] = { + private def createRowBaseReader(file: PartitionedFile): RecordReader[Void, InternalRow] = { buildReaderBase(file, createRowBaseParquetReader) } @@ -185,16 +184,16 @@ case class ParquetPartitionReaderFactory( partitionValues: InternalRow, hadoopAttemptContext: TaskAttemptContextImpl, pushed: Option[FilterPredicate], - convertTz: Option[TimeZone]): RecordReader[Void, UnsafeRow] = { + convertTz: Option[TimeZone]): RecordReader[Void, InternalRow] = { logDebug(s"Falling back to parquet-mr") val taskContext = Option(TaskContext.get()) - // ParquetRecordReader returns UnsafeRow + // ParquetRecordReader returns InternalRow val readSupport = new ParquetReadSupport(convertTz, enableVectorizedReader = false) val reader = if (pushed.isDefined && enableRecordFilter) { val parquetFilter = FilterCompat.get(pushed.get, null) - new ParquetRecordReader[UnsafeRow](readSupport, parquetFilter) + new ParquetRecordReader[InternalRow](readSupport, parquetFilter) } else { - new ParquetRecordReader[UnsafeRow](readSupport) + new ParquetRecordReader[InternalRow](readSupport) } val iter = new RecordReaderIterator(reader) // SPARK-23457 Register a task completion listener before `initialization`.