[SPARK-22282][SQL] Rename OrcRelation to OrcFileFormat and remove ORC_COMPRESSION

## What changes were proposed in this pull request?

This PR aims to
- Rename `OrcRelation` to `OrcFileFormat` object.
- Replace `OrcRelation.ORC_COMPRESSION` with `org.apache.orc.OrcConf.COMPRESS`. Since [SPARK-21422](https://issues.apache.org/jira/browse/SPARK-21422), we can use `OrcConf.COMPRESS` instead of Hive's.

```scala
// The references of Hive's classes will be minimized.
val ORC_COMPRESSION = "orc.compress"
```

## How was this patch tested?

Pass the Jenkins with the existing and updated test cases.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #19502 from dongjoon-hyun/SPARK-22282.
This commit is contained in:
Dongjoon Hyun 2017-10-16 11:27:08 -07:00 committed by gatorsmile
parent 0fa10666cf
commit 561505e2fc
5 changed files with 27 additions and 23 deletions

View file

@ -520,8 +520,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* <li>`compression` (default is the value specified in `spark.sql.orc.compression.codec`): * <li>`compression` (default is the value specified in `spark.sql.orc.compression.codec`):
* compression codec to use when saving to file. This can be one of the known case-insensitive * compression codec to use when saving to file. This can be one of the known case-insensitive
* shorten names(`none`, `snappy`, `zlib`, and `lzo`). This will override * shorten names(`none`, `snappy`, `zlib`, and `lzo`). This will override
* `orc.compress` and `spark.sql.parquet.compression.codec`. If `orc.compress` is given, * `orc.compress` and `spark.sql.orc.compression.codec`. If `orc.compress` is given,
* it overrides `spark.sql.parquet.compression.codec`.</li> * it overrides `spark.sql.orc.compression.codec`.</li>
* </ul> * </ul>
* *
* @since 1.5.0 * @since 1.5.0

View file

@ -32,6 +32,7 @@ import org.apache.hadoop.io.{NullWritable, Writable}
import org.apache.hadoop.mapred.{JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter} import org.apache.hadoop.mapred.{JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter}
import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit} import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
import org.apache.orc.OrcConf.COMPRESS
import org.apache.spark.TaskContext import org.apache.spark.TaskContext
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSession
@ -72,7 +73,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
val configuration = job.getConfiguration val configuration = job.getConfiguration
configuration.set(OrcRelation.ORC_COMPRESSION, orcOptions.compressionCodec) configuration.set(COMPRESS.getAttribute, orcOptions.compressionCodec)
configuration match { configuration match {
case conf: JobConf => case conf: JobConf =>
conf.setOutputFormat(classOf[OrcOutputFormat]) conf.setOutputFormat(classOf[OrcOutputFormat])
@ -93,8 +94,8 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
override def getFileExtension(context: TaskAttemptContext): String = { override def getFileExtension(context: TaskAttemptContext): String = {
val compressionExtension: String = { val compressionExtension: String = {
val name = context.getConfiguration.get(OrcRelation.ORC_COMPRESSION) val name = context.getConfiguration.get(COMPRESS.getAttribute)
OrcRelation.extensionsForCompressionCodecNames.getOrElse(name, "") OrcFileFormat.extensionsForCompressionCodecNames.getOrElse(name, "")
} }
compressionExtension + ".orc" compressionExtension + ".orc"
@ -120,7 +121,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
if (sparkSession.sessionState.conf.orcFilterPushDown) { if (sparkSession.sessionState.conf.orcFilterPushDown) {
// Sets pushed predicates // Sets pushed predicates
OrcFilters.createFilter(requiredSchema, filters.toArray).foreach { f => OrcFilters.createFilter(requiredSchema, filters.toArray).foreach { f =>
hadoopConf.set(OrcRelation.SARG_PUSHDOWN, f.toKryo) hadoopConf.set(OrcFileFormat.SARG_PUSHDOWN, f.toKryo)
hadoopConf.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, true) hadoopConf.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, true)
} }
} }
@ -138,7 +139,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
if (isEmptyFile) { if (isEmptyFile) {
Iterator.empty Iterator.empty
} else { } else {
OrcRelation.setRequiredColumns(conf, dataSchema, requiredSchema) OrcFileFormat.setRequiredColumns(conf, dataSchema, requiredSchema)
val orcRecordReader = { val orcRecordReader = {
val job = Job.getInstance(conf) val job = Job.getInstance(conf)
@ -160,7 +161,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => recordsIterator.close())) Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => recordsIterator.close()))
// Unwraps `OrcStruct`s to `UnsafeRow`s // Unwraps `OrcStruct`s to `UnsafeRow`s
OrcRelation.unwrapOrcStructs( OrcFileFormat.unwrapOrcStructs(
conf, conf,
dataSchema, dataSchema,
requiredSchema, requiredSchema,
@ -255,10 +256,7 @@ private[orc] class OrcOutputWriter(
} }
} }
private[orc] object OrcRelation extends HiveInspectors { private[orc] object OrcFileFormat extends HiveInspectors {
// The references of Hive's classes will be minimized.
val ORC_COMPRESSION = "orc.compress"
// This constant duplicates `OrcInputFormat.SARG_PUSHDOWN`, which is unfortunately not public. // This constant duplicates `OrcInputFormat.SARG_PUSHDOWN`, which is unfortunately not public.
private[orc] val SARG_PUSHDOWN = "sarg.pushdown" private[orc] val SARG_PUSHDOWN = "sarg.pushdown"

View file

@ -19,6 +19,8 @@ package org.apache.spark.sql.hive.orc
import java.util.Locale import java.util.Locale
import org.apache.orc.OrcConf.COMPRESS
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf
@ -40,9 +42,9 @@ private[orc] class OrcOptions(
* Acceptable values are defined in [[shortOrcCompressionCodecNames]]. * Acceptable values are defined in [[shortOrcCompressionCodecNames]].
*/ */
val compressionCodec: String = { val compressionCodec: String = {
// `compression`, `orc.compress`, and `spark.sql.orc.compression.codec` are // `compression`, `orc.compress`(i.e., OrcConf.COMPRESS), and `spark.sql.orc.compression.codec`
// in order of precedence from highest to lowest. // are in order of precedence from highest to lowest.
val orcCompressionConf = parameters.get(OrcRelation.ORC_COMPRESSION) val orcCompressionConf = parameters.get(COMPRESS.getAttribute)
val codecName = parameters val codecName = parameters
.get("compression") .get("compression")
.orElse(orcCompressionConf) .orElse(orcCompressionConf)

View file

@ -22,6 +22,7 @@ import java.sql.Timestamp
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.ql.io.orc.{OrcStruct, SparkOrcNewRecordReader} import org.apache.hadoop.hive.ql.io.orc.{OrcStruct, SparkOrcNewRecordReader}
import org.apache.orc.OrcConf.COMPRESS
import org.scalatest.BeforeAndAfterAll import org.scalatest.BeforeAndAfterAll
import org.apache.spark.sql._ import org.apache.spark.sql._
@ -176,11 +177,11 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
} }
} }
test("SPARK-16610: Respect orc.compress option when compression is unset") { test("SPARK-16610: Respect orc.compress (i.e., OrcConf.COMPRESS) when compression is unset") {
// Respect `orc.compress`. // Respect `orc.compress` (i.e., OrcConf.COMPRESS).
withTempPath { file => withTempPath { file =>
spark.range(0, 10).write spark.range(0, 10).write
.option("orc.compress", "ZLIB") .option(COMPRESS.getAttribute, "ZLIB")
.orc(file.getCanonicalPath) .orc(file.getCanonicalPath)
val expectedCompressionKind = val expectedCompressionKind =
OrcFileOperator.getFileReader(file.getCanonicalPath).get.getCompression OrcFileOperator.getFileReader(file.getCanonicalPath).get.getCompression
@ -191,7 +192,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
withTempPath { file => withTempPath { file =>
spark.range(0, 10).write spark.range(0, 10).write
.option("compression", "ZLIB") .option("compression", "ZLIB")
.option("orc.compress", "SNAPPY") .option(COMPRESS.getAttribute, "SNAPPY")
.orc(file.getCanonicalPath) .orc(file.getCanonicalPath)
val expectedCompressionKind = val expectedCompressionKind =
OrcFileOperator.getFileReader(file.getCanonicalPath).get.getCompression OrcFileOperator.getFileReader(file.getCanonicalPath).get.getCompression
@ -598,7 +599,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
val requestedSchema = StructType(Nil) val requestedSchema = StructType(Nil)
val conf = new Configuration() val conf = new Configuration()
val physicalSchema = OrcFileOperator.readSchema(Seq(path), Some(conf)).get val physicalSchema = OrcFileOperator.readSchema(Seq(path), Some(conf)).get
OrcRelation.setRequiredColumns(conf, physicalSchema, requestedSchema) OrcFileFormat.setRequiredColumns(conf, physicalSchema, requestedSchema)
val maybeOrcReader = OrcFileOperator.getFileReader(path, Some(conf)) val maybeOrcReader = OrcFileOperator.getFileReader(path, Some(conf))
assert(maybeOrcReader.isDefined) assert(maybeOrcReader.isDefined)
val orcRecordReader = new SparkOrcNewRecordReader( val orcRecordReader = new SparkOrcNewRecordReader(

View file

@ -18,7 +18,9 @@
package org.apache.spark.sql.hive.orc package org.apache.spark.sql.hive.orc
import java.io.File import java.io.File
import java.util.Locale
import org.apache.orc.OrcConf.COMPRESS
import org.scalatest.BeforeAndAfterAll import org.scalatest.BeforeAndAfterAll
import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.{QueryTest, Row}
@ -150,7 +152,8 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA
test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") { test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {
val conf = sqlContext.sessionState.conf val conf = sqlContext.sessionState.conf
assert(new OrcOptions(Map("Orc.Compress" -> "NONE"), conf).compressionCodec == "NONE") val option = new OrcOptions(Map(COMPRESS.getAttribute.toUpperCase(Locale.ROOT) -> "NONE"), conf)
assert(option.compressionCodec == "NONE")
} }
test("SPARK-19459/SPARK-18220: read char/varchar column written by Hive") { test("SPARK-19459/SPARK-18220: read char/varchar column written by Hive") {
@ -205,8 +208,8 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA
// `compression` -> `orc.compression` -> `spark.sql.orc.compression.codec` // `compression` -> `orc.compression` -> `spark.sql.orc.compression.codec`
withSQLConf(SQLConf.ORC_COMPRESSION.key -> "uncompressed") { withSQLConf(SQLConf.ORC_COMPRESSION.key -> "uncompressed") {
assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == "NONE") assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == "NONE")
val map1 = Map("orc.compress" -> "zlib") val map1 = Map(COMPRESS.getAttribute -> "zlib")
val map2 = Map("orc.compress" -> "zlib", "compression" -> "lzo") val map2 = Map(COMPRESS.getAttribute -> "zlib", "compression" -> "lzo")
assert(new OrcOptions(map1, conf).compressionCodec == "ZLIB") assert(new OrcOptions(map1, conf).compressionCodec == "ZLIB")
assert(new OrcOptions(map2, conf).compressionCodec == "LZO") assert(new OrcOptions(map2, conf).compressionCodec == "LZO")
} }