[SPARK-14116][SQL] Implements buildReader() for ORC data source
## What changes were proposed in this pull request? This PR implements `FileFormat.buildReader()` for our ORC data source. It also fixed several minor styling issues related to `HadoopFsRelation` planning code path. Note that `OrcNewInputFormat` doesn't rely on `OrcNewSplit` for creating `OrcRecordReader`s, plain `FileSplit` is just fine. That's why we can simply create the record reader with the help of `OrcNewInputFormat` and `FileSplit`. ## How was this patch tested? Existing test cases should do the work Author: Cheng Lian <lian@databricks.com> Closes #11936 from liancheng/spark-14116-build-reader-for-orc.
This commit is contained in:
parent
8989d3a396
commit
b547de8a60
|
@ -208,9 +208,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
|
||||||
|
|
||||||
val bucketedRDD = new UnionRDD(t.sqlContext.sparkContext,
|
val bucketedRDD = new UnionRDD(t.sqlContext.sparkContext,
|
||||||
(0 until spec.numBuckets).map { bucketId =>
|
(0 until spec.numBuckets).map { bucketId =>
|
||||||
bucketedDataMap.get(bucketId).getOrElse {
|
bucketedDataMap.getOrElse(bucketId, t.sqlContext.emptyResult: RDD[InternalRow])
|
||||||
t.sqlContext.emptyResult: RDD[InternalRow]
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
bucketedRDD
|
bucketedRDD
|
||||||
}
|
}
|
||||||
|
@ -387,7 +385,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
|
||||||
result.setColumn(resultIdx, input.column(inputIdx))
|
result.setColumn(resultIdx, input.column(inputIdx))
|
||||||
inputIdx += 1
|
inputIdx += 1
|
||||||
} else {
|
} else {
|
||||||
require(partitionColumnSchema.fields.filter(_.name.equals(attr.name)).length == 1)
|
require(partitionColumnSchema.fields.count(_.name == attr.name) == 1)
|
||||||
var partitionIdx = 0
|
var partitionIdx = 0
|
||||||
partitionColumnSchema.fields.foreach { f => {
|
partitionColumnSchema.fields.foreach { f => {
|
||||||
if (f.name.equals(attr.name)) {
|
if (f.name.equals(attr.name)) {
|
||||||
|
|
|
@ -32,7 +32,7 @@ case class PartitionedFile(
|
||||||
filePath: String,
|
filePath: String,
|
||||||
start: Long,
|
start: Long,
|
||||||
length: Long) {
|
length: Long) {
|
||||||
override def toString(): String = {
|
override def toString: String = {
|
||||||
s"path: $filePath, range: $start-${start + length}, partition values: $partitionValues"
|
s"path: $filePath, range: $start-${start + length}, partition values: $partitionValues"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -44,7 +44,7 @@ case class PartitionedFile(
|
||||||
*
|
*
|
||||||
* TODO: This currently does not take locality information about the files into account.
|
* TODO: This currently does not take locality information about the files into account.
|
||||||
*/
|
*/
|
||||||
case class FilePartition(val index: Int, files: Seq[PartitionedFile]) extends Partition
|
case class FilePartition(index: Int, files: Seq[PartitionedFile]) extends Partition
|
||||||
|
|
||||||
class FileScanRDD(
|
class FileScanRDD(
|
||||||
@transient val sqlContext: SQLContext,
|
@transient val sqlContext: SQLContext,
|
||||||
|
|
|
@ -29,7 +29,6 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation
|
||||||
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
|
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
|
||||||
import org.apache.spark.sql.execution.{DataSourceScan, SparkPlan}
|
import org.apache.spark.sql.execution.{DataSourceScan, SparkPlan}
|
||||||
import org.apache.spark.sql.sources._
|
import org.apache.spark.sql.sources._
|
||||||
import org.apache.spark.sql.types._
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A strategy for planning scans over collections of files that might be partitioned or bucketed
|
* A strategy for planning scans over collections of files that might be partitioned or bucketed
|
||||||
|
@ -58,7 +57,8 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
|
||||||
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
|
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
|
||||||
case PhysicalOperation(projects, filters, l @ LogicalRelation(files: HadoopFsRelation, _, _))
|
case PhysicalOperation(projects, filters, l @ LogicalRelation(files: HadoopFsRelation, _, _))
|
||||||
if (files.fileFormat.toString == "TestFileFormat" ||
|
if (files.fileFormat.toString == "TestFileFormat" ||
|
||||||
files.fileFormat.isInstanceOf[parquet.DefaultSource]) &&
|
files.fileFormat.isInstanceOf[parquet.DefaultSource] ||
|
||||||
|
files.fileFormat.toString == "ORC") &&
|
||||||
files.sqlContext.conf.parquetFileScan =>
|
files.sqlContext.conf.parquetFileScan =>
|
||||||
// Filters on this relation fall into four categories based on where we can use them to avoid
|
// Filters on this relation fall into four categories based on where we can use them to avoid
|
||||||
// reading unneeded data:
|
// reading unneeded data:
|
||||||
|
@ -120,8 +120,7 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
|
||||||
case Some(bucketing) if files.sqlContext.conf.bucketingEnabled =>
|
case Some(bucketing) if files.sqlContext.conf.bucketingEnabled =>
|
||||||
logInfo(s"Planning with ${bucketing.numBuckets} buckets")
|
logInfo(s"Planning with ${bucketing.numBuckets} buckets")
|
||||||
val bucketed =
|
val bucketed =
|
||||||
selectedPartitions
|
selectedPartitions.flatMap { p =>
|
||||||
.flatMap { p =>
|
|
||||||
p.files.map(f => PartitionedFile(p.values, f.getPath.toUri.toString, 0, f.getLen))
|
p.files.map(f => PartitionedFile(p.values, f.getPath.toUri.toString, 0, f.getLen))
|
||||||
}.groupBy { f =>
|
}.groupBy { f =>
|
||||||
BucketingUtils
|
BucketingUtils
|
||||||
|
|
|
@ -17,7 +17,6 @@
|
||||||
|
|
||||||
package org.apache.spark.sql.execution.datasources.text
|
package org.apache.spark.sql.execution.datasources.text
|
||||||
|
|
||||||
import com.google.common.base.Objects
|
|
||||||
import org.apache.hadoop.fs.{FileStatus, Path}
|
import org.apache.hadoop.fs.{FileStatus, Path}
|
||||||
import org.apache.hadoop.io.{LongWritable, NullWritable, Text}
|
import org.apache.hadoop.io.{LongWritable, NullWritable, Text}
|
||||||
import org.apache.hadoop.mapred.{JobConf, TextInputFormat}
|
import org.apache.hadoop.mapred.{JobConf, TextInputFormat}
|
||||||
|
|
|
@ -24,7 +24,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector
|
||||||
|
|
||||||
import org.apache.spark.deploy.SparkHadoopUtil
|
import org.apache.spark.deploy.SparkHadoopUtil
|
||||||
import org.apache.spark.internal.Logging
|
import org.apache.spark.internal.Logging
|
||||||
import org.apache.spark.sql.AnalysisException
|
|
||||||
import org.apache.spark.sql.hive.HiveMetastoreTypes
|
import org.apache.spark.sql.hive.HiveMetastoreTypes
|
||||||
import org.apache.spark.sql.types.StructType
|
import org.apache.spark.sql.types.StructType
|
||||||
|
|
||||||
|
@ -92,7 +91,6 @@ private[orc] object OrcFileOperator extends Logging {
|
||||||
// TODO: Check if the paths coming in are already qualified and simplify.
|
// TODO: Check if the paths coming in are already qualified and simplify.
|
||||||
val origPath = new Path(pathStr)
|
val origPath = new Path(pathStr)
|
||||||
val fs = origPath.getFileSystem(conf)
|
val fs = origPath.getFileSystem(conf)
|
||||||
val path = origPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
|
|
||||||
val paths = SparkHadoopUtil.get.listLeafStatuses(fs, origPath)
|
val paths = SparkHadoopUtil.get.listLeafStatuses(fs, origPath)
|
||||||
.filterNot(_.isDirectory)
|
.filterNot(_.isDirectory)
|
||||||
.map(_.getPath)
|
.map(_.getPath)
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
|
|
||||||
package org.apache.spark.sql.hive.orc
|
package org.apache.spark.sql.hive.orc
|
||||||
|
|
||||||
|
import java.net.URI
|
||||||
import java.util.Properties
|
import java.util.Properties
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration
|
import org.apache.hadoop.conf.Configuration
|
||||||
|
@ -24,12 +25,13 @@ import org.apache.hadoop.fs.{FileStatus, Path}
|
||||||
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
|
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
|
||||||
import org.apache.hadoop.hive.ql.io.orc._
|
import org.apache.hadoop.hive.ql.io.orc._
|
||||||
import org.apache.hadoop.hive.ql.io.orc.OrcFile.OrcTableProperties
|
import org.apache.hadoop.hive.ql.io.orc.OrcFile.OrcTableProperties
|
||||||
import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector
|
import org.apache.hadoop.hive.serde2.objectinspector.{SettableStructObjectInspector, StructObjectInspector}
|
||||||
import org.apache.hadoop.hive.serde2.typeinfo.{StructTypeInfo, TypeInfoUtils}
|
import org.apache.hadoop.hive.serde2.typeinfo.{StructTypeInfo, TypeInfoUtils}
|
||||||
import org.apache.hadoop.io.{NullWritable, Writable}
|
import org.apache.hadoop.io.{NullWritable, Writable}
|
||||||
import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter}
|
import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter}
|
||||||
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
|
import org.apache.hadoop.mapreduce._
|
||||||
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
|
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
|
||||||
|
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
|
||||||
|
|
||||||
import org.apache.spark.broadcast.Broadcast
|
import org.apache.spark.broadcast.Broadcast
|
||||||
import org.apache.spark.internal.Logging
|
import org.apache.spark.internal.Logging
|
||||||
|
@ -37,6 +39,7 @@ import org.apache.spark.rdd.{HadoopRDD, RDD}
|
||||||
import org.apache.spark.sql.{Row, SQLContext}
|
import org.apache.spark.sql.{Row, SQLContext}
|
||||||
import org.apache.spark.sql.catalyst.InternalRow
|
import org.apache.spark.sql.catalyst.InternalRow
|
||||||
import org.apache.spark.sql.catalyst.expressions._
|
import org.apache.spark.sql.catalyst.expressions._
|
||||||
|
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
|
||||||
import org.apache.spark.sql.execution.datasources._
|
import org.apache.spark.sql.execution.datasources._
|
||||||
import org.apache.spark.sql.hive.{HiveInspectors, HiveMetastoreTypes, HiveShim}
|
import org.apache.spark.sql.hive.{HiveInspectors, HiveMetastoreTypes, HiveShim}
|
||||||
import org.apache.spark.sql.sources.{Filter, _}
|
import org.apache.spark.sql.sources.{Filter, _}
|
||||||
|
@ -44,7 +47,8 @@ import org.apache.spark.sql.types.StructType
|
||||||
import org.apache.spark.util.SerializableConfiguration
|
import org.apache.spark.util.SerializableConfiguration
|
||||||
import org.apache.spark.util.collection.BitSet
|
import org.apache.spark.util.collection.BitSet
|
||||||
|
|
||||||
private[sql] class DefaultSource extends FileFormat with DataSourceRegister {
|
private[sql] class DefaultSource
|
||||||
|
extends FileFormat with DataSourceRegister with Serializable {
|
||||||
|
|
||||||
override def shortName(): String = "orc"
|
override def shortName(): String = "orc"
|
||||||
|
|
||||||
|
@ -55,7 +59,9 @@ private[sql] class DefaultSource extends FileFormat with DataSourceRegister {
|
||||||
options: Map[String, String],
|
options: Map[String, String],
|
||||||
files: Seq[FileStatus]): Option[StructType] = {
|
files: Seq[FileStatus]): Option[StructType] = {
|
||||||
OrcFileOperator.readSchema(
|
OrcFileOperator.readSchema(
|
||||||
files.map(_.getPath.toUri.toString), Some(sqlContext.sparkContext.hadoopConfiguration))
|
files.map(_.getPath.toUri.toString),
|
||||||
|
Some(sqlContext.sparkContext.hadoopConfiguration)
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
override def prepareWrite(
|
override def prepareWrite(
|
||||||
|
@ -117,6 +123,68 @@ private[sql] class DefaultSource extends FileFormat with DataSourceRegister {
|
||||||
val output = StructType(requiredColumns.map(dataSchema(_))).toAttributes
|
val output = StructType(requiredColumns.map(dataSchema(_))).toAttributes
|
||||||
OrcTableScan(sqlContext, output, filters, inputFiles).execute()
|
OrcTableScan(sqlContext, output, filters, inputFiles).execute()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override def buildReader(
|
||||||
|
sqlContext: SQLContext,
|
||||||
|
partitionSchema: StructType,
|
||||||
|
dataSchema: StructType,
|
||||||
|
filters: Seq[Filter],
|
||||||
|
options: Map[String, String]): (PartitionedFile) => Iterator[InternalRow] = {
|
||||||
|
val orcConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
|
||||||
|
|
||||||
|
if (sqlContext.conf.orcFilterPushDown) {
|
||||||
|
// Sets pushed predicates
|
||||||
|
OrcFilters.createFilter(filters.toArray).foreach { f =>
|
||||||
|
orcConf.set(OrcTableScan.SARG_PUSHDOWN, f.toKryo)
|
||||||
|
orcConf.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, true)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
val broadcastedConf = sqlContext.sparkContext.broadcast(new SerializableConfiguration(orcConf))
|
||||||
|
|
||||||
|
(file: PartitionedFile) => {
|
||||||
|
val conf = broadcastedConf.value.value
|
||||||
|
|
||||||
|
// SPARK-8501: Empty ORC files always have an empty schema stored in their footer. In this
|
||||||
|
// case, `OrcFileOperator.readSchema` returns `None`, and we can simply return an empty
|
||||||
|
// iterator.
|
||||||
|
val maybePhysicalSchema = OrcFileOperator.readSchema(Seq(file.filePath), Some(conf))
|
||||||
|
if (maybePhysicalSchema.isEmpty) {
|
||||||
|
Iterator.empty
|
||||||
|
} else {
|
||||||
|
val physicalSchema = maybePhysicalSchema.get
|
||||||
|
OrcRelation.setRequiredColumns(conf, physicalSchema, dataSchema)
|
||||||
|
|
||||||
|
val orcRecordReader = {
|
||||||
|
val job = Job.getInstance(conf)
|
||||||
|
FileInputFormat.setInputPaths(job, file.filePath)
|
||||||
|
|
||||||
|
val inputFormat = new OrcNewInputFormat
|
||||||
|
val fileSplit = new FileSplit(
|
||||||
|
new Path(new URI(file.filePath)), file.start, file.length, Array.empty
|
||||||
|
)
|
||||||
|
|
||||||
|
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
|
||||||
|
val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
|
||||||
|
inputFormat.createRecordReader(fileSplit, hadoopAttemptContext)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unwraps `OrcStruct`s to `UnsafeRow`s
|
||||||
|
val unsafeRowIterator = OrcRelation.unwrapOrcStructs(
|
||||||
|
file.filePath, conf, dataSchema, new RecordReaderIterator[OrcStruct](orcRecordReader)
|
||||||
|
)
|
||||||
|
|
||||||
|
// Appends partition values
|
||||||
|
val fullOutput = dataSchema.toAttributes ++ partitionSchema.toAttributes
|
||||||
|
val joinedRow = new JoinedRow()
|
||||||
|
val appendPartitionColumns = GenerateUnsafeProjection.generate(fullOutput, fullOutput)
|
||||||
|
|
||||||
|
unsafeRowIterator.map { dataRow =>
|
||||||
|
appendPartitionColumns(joinedRow(dataRow, file.partitionValues))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private[orc] class OrcOutputWriter(
|
private[orc] class OrcOutputWriter(
|
||||||
|
@ -225,55 +293,6 @@ private[orc] case class OrcTableScan(
|
||||||
extends Logging
|
extends Logging
|
||||||
with HiveInspectors {
|
with HiveInspectors {
|
||||||
|
|
||||||
private def addColumnIds(
|
|
||||||
dataSchema: StructType,
|
|
||||||
output: Seq[Attribute],
|
|
||||||
conf: Configuration): Unit = {
|
|
||||||
val ids = output.map(a => dataSchema.fieldIndex(a.name): Integer)
|
|
||||||
val (sortedIds, sortedNames) = ids.zip(attributes.map(_.name)).sorted.unzip
|
|
||||||
HiveShim.appendReadColumns(conf, sortedIds, sortedNames)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Transform all given raw `Writable`s into `InternalRow`s.
|
|
||||||
private def fillObject(
|
|
||||||
path: String,
|
|
||||||
conf: Configuration,
|
|
||||||
iterator: Iterator[Writable],
|
|
||||||
nonPartitionKeyAttrs: Seq[Attribute]): Iterator[InternalRow] = {
|
|
||||||
val deserializer = new OrcSerde
|
|
||||||
val maybeStructOI = OrcFileOperator.getObjectInspector(path, Some(conf))
|
|
||||||
val mutableRow = new SpecificMutableRow(attributes.map(_.dataType))
|
|
||||||
val unsafeProjection = UnsafeProjection.create(StructType.fromAttributes(nonPartitionKeyAttrs))
|
|
||||||
|
|
||||||
// SPARK-8501: ORC writes an empty schema ("struct<>") to an ORC file if the file contains zero
|
|
||||||
// rows, and thus couldn't give a proper ObjectInspector. In this case we just return an empty
|
|
||||||
// partition since we know that this file is empty.
|
|
||||||
maybeStructOI.map { soi =>
|
|
||||||
val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.zipWithIndex.map {
|
|
||||||
case (attr, ordinal) =>
|
|
||||||
soi.getStructFieldRef(attr.name) -> ordinal
|
|
||||||
}.unzip
|
|
||||||
val unwrappers = fieldRefs.map(unwrapperFor)
|
|
||||||
// Map each tuple to a row object
|
|
||||||
iterator.map { value =>
|
|
||||||
val raw = deserializer.deserialize(value)
|
|
||||||
var i = 0
|
|
||||||
while (i < fieldRefs.length) {
|
|
||||||
val fieldValue = soi.getStructFieldData(raw, fieldRefs(i))
|
|
||||||
if (fieldValue == null) {
|
|
||||||
mutableRow.setNullAt(fieldOrdinals(i))
|
|
||||||
} else {
|
|
||||||
unwrappers(i)(fieldValue, mutableRow, fieldOrdinals(i))
|
|
||||||
}
|
|
||||||
i += 1
|
|
||||||
}
|
|
||||||
unsafeProjection(mutableRow)
|
|
||||||
}
|
|
||||||
}.getOrElse {
|
|
||||||
Iterator.empty
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
def execute(): RDD[InternalRow] = {
|
def execute(): RDD[InternalRow] = {
|
||||||
val job = Job.getInstance(sqlContext.sparkContext.hadoopConfiguration)
|
val job = Job.getInstance(sqlContext.sparkContext.hadoopConfiguration)
|
||||||
val conf = job.getConfiguration
|
val conf = job.getConfiguration
|
||||||
|
@ -294,7 +313,7 @@ private[orc] case class OrcTableScan(
|
||||||
.inferSchema(sqlContext, Map.empty, inputPaths)
|
.inferSchema(sqlContext, Map.empty, inputPaths)
|
||||||
.getOrElse(sys.error("Failed to read schema from target ORC files."))
|
.getOrElse(sys.error("Failed to read schema from target ORC files."))
|
||||||
// Sets requested columns
|
// Sets requested columns
|
||||||
addColumnIds(dataSchema, attributes, conf)
|
OrcRelation.setRequiredColumns(conf, dataSchema, StructType.fromAttributes(attributes))
|
||||||
|
|
||||||
if (inputPaths.isEmpty) {
|
if (inputPaths.isEmpty) {
|
||||||
// the input path probably be pruned, return an empty RDD.
|
// the input path probably be pruned, return an empty RDD.
|
||||||
|
@ -317,7 +336,12 @@ private[orc] case class OrcTableScan(
|
||||||
|
|
||||||
rdd.mapPartitionsWithInputSplit { case (split: OrcSplit, iterator) =>
|
rdd.mapPartitionsWithInputSplit { case (split: OrcSplit, iterator) =>
|
||||||
val writableIterator = iterator.map(_._2)
|
val writableIterator = iterator.map(_._2)
|
||||||
fillObject(split.getPath.toString, wrappedConf.value, writableIterator, attributes)
|
OrcRelation.unwrapOrcStructs(
|
||||||
|
split.getPath.toString,
|
||||||
|
wrappedConf.value,
|
||||||
|
StructType.fromAttributes(attributes),
|
||||||
|
writableIterator
|
||||||
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -327,7 +351,7 @@ private[orc] object OrcTableScan {
|
||||||
private[orc] val SARG_PUSHDOWN = "sarg.pushdown"
|
private[orc] val SARG_PUSHDOWN = "sarg.pushdown"
|
||||||
}
|
}
|
||||||
|
|
||||||
private[orc] object OrcRelation {
|
private[orc] object OrcRelation extends HiveInspectors {
|
||||||
// The ORC compression short names
|
// The ORC compression short names
|
||||||
val shortOrcCompressionCodecNames = Map(
|
val shortOrcCompressionCodecNames = Map(
|
||||||
"none" -> CompressionKind.NONE,
|
"none" -> CompressionKind.NONE,
|
||||||
|
@ -343,5 +367,47 @@ private[orc] object OrcRelation {
|
||||||
CompressionKind.ZLIB.name -> ".zlib",
|
CompressionKind.ZLIB.name -> ".zlib",
|
||||||
CompressionKind.LZO.name -> ".lzo"
|
CompressionKind.LZO.name -> ".lzo"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def unwrapOrcStructs(
|
||||||
|
filePath: String,
|
||||||
|
conf: Configuration,
|
||||||
|
dataSchema: StructType,
|
||||||
|
iterator: Iterator[Writable]): Iterator[InternalRow] = {
|
||||||
|
val deserializer = new OrcSerde
|
||||||
|
val maybeStructOI = OrcFileOperator.getObjectInspector(filePath, Some(conf))
|
||||||
|
val mutableRow = new SpecificMutableRow(dataSchema.map(_.dataType))
|
||||||
|
val unsafeProjection = UnsafeProjection.create(dataSchema)
|
||||||
|
|
||||||
|
def unwrap(oi: StructObjectInspector): Iterator[InternalRow] = {
|
||||||
|
val (fieldRefs, fieldOrdinals) = dataSchema.zipWithIndex.map {
|
||||||
|
case (field, ordinal) => oi.getStructFieldRef(field.name) -> ordinal
|
||||||
|
}.unzip
|
||||||
|
|
||||||
|
val unwrappers = fieldRefs.map(unwrapperFor)
|
||||||
|
|
||||||
|
iterator.map { value =>
|
||||||
|
val raw = deserializer.deserialize(value)
|
||||||
|
var i = 0
|
||||||
|
while (i < fieldRefs.length) {
|
||||||
|
val fieldValue = oi.getStructFieldData(raw, fieldRefs(i))
|
||||||
|
if (fieldValue == null) {
|
||||||
|
mutableRow.setNullAt(fieldOrdinals(i))
|
||||||
|
} else {
|
||||||
|
unwrappers(i)(fieldValue, mutableRow, fieldOrdinals(i))
|
||||||
|
}
|
||||||
|
i += 1
|
||||||
|
}
|
||||||
|
unsafeProjection(mutableRow)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
maybeStructOI.map(unwrap).getOrElse(Iterator.empty)
|
||||||
|
}
|
||||||
|
|
||||||
|
def setRequiredColumns(
|
||||||
|
conf: Configuration, physicalSchema: StructType, requestedSchema: StructType): Unit = {
|
||||||
|
val ids = requestedSchema.map(a => physicalSchema.fieldIndex(a.name): Integer)
|
||||||
|
val (sortedIDs, sortedNames) = ids.zip(requestedSchema.fieldNames).sorted.unzip
|
||||||
|
HiveShim.appendReadColumns(conf, sortedIDs, sortedNames)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue