[SPARK-25635][SQL][BUILD] Support selective direct encoding in native ORC write
## What changes were proposed in this pull request? Before ORC 1.5.3, `orc.dictionary.key.threshold` and `hive.exec.orc.dictionary.key.size.threshold` are applied for all columns. This has been a big huddle to enable dictionary encoding. From ORC 1.5.3, `orc.column.encoding.direct` is added to enforce direct encoding selectively in a column-wise manner. This PR aims to add that feature by upgrading ORC from 1.5.2 to 1.5.3. The followings are the patches in ORC 1.5.3 and this feature is the only one related to Spark directly. ``` ORC-406: ORC: Char(n) and Varchar(n) writers truncate to n bytes & corrupts multi-byte data (gopalv) ORC-403: [C++] Add checks to avoid invalid offsets in InputStream ORC-405: Remove calcite as a dependency from the benchmarks. ORC-375: Fix libhdfs on gcc7 by adding #include <functional> two places. ORC-383: Parallel builds fails with ConcurrentModificationException ORC-382: Apache rat exclusions + add rat check to travis ORC-401: Fix incorrect quoting in specification. ORC-385: Change RecordReader to extend Closeable. ORC-384: [C++] fix memory leak when loading non-ORC files ORC-391: [c++] parseType does not accept underscore in the field name ORC-397: Allow selective disabling of dictionary encoding. Original patch was by Mithun Radhakrishnan. ORC-389: Add ability to not decode Acid metadata columns ``` ## How was this patch tested? Pass the Jenkins with newly added test cases. Closes #22622 from dongjoon-hyun/SPARK-25635. Authored-by: Dongjoon Hyun <dongjoon@apache.org> Signed-off-by: gatorsmile <gatorsmile@gmail.com>
This commit is contained in:
parent
a433fbcee6
commit
1c9486c1ac
|
@ -153,9 +153,9 @@ objenesis-2.5.1.jar
|
|||
okhttp-3.8.1.jar
|
||||
okio-1.13.0.jar
|
||||
opencsv-2.3.jar
|
||||
orc-core-1.5.2-nohive.jar
|
||||
orc-mapreduce-1.5.2-nohive.jar
|
||||
orc-shims-1.5.2.jar
|
||||
orc-core-1.5.3-nohive.jar
|
||||
orc-mapreduce-1.5.3-nohive.jar
|
||||
orc-shims-1.5.3.jar
|
||||
oro-2.0.8.jar
|
||||
osgi-resource-locator-1.0.1.jar
|
||||
paranamer-2.8.jar
|
||||
|
|
|
@ -154,9 +154,9 @@ objenesis-2.5.1.jar
|
|||
okhttp-3.8.1.jar
|
||||
okio-1.13.0.jar
|
||||
opencsv-2.3.jar
|
||||
orc-core-1.5.2-nohive.jar
|
||||
orc-mapreduce-1.5.2-nohive.jar
|
||||
orc-shims-1.5.2.jar
|
||||
orc-core-1.5.3-nohive.jar
|
||||
orc-mapreduce-1.5.3-nohive.jar
|
||||
orc-shims-1.5.3.jar
|
||||
oro-2.0.8.jar
|
||||
osgi-resource-locator-1.0.1.jar
|
||||
paranamer-2.8.jar
|
||||
|
|
|
@ -172,9 +172,9 @@ okhttp-2.7.5.jar
|
|||
okhttp-3.8.1.jar
|
||||
okio-1.13.0.jar
|
||||
opencsv-2.3.jar
|
||||
orc-core-1.5.2-nohive.jar
|
||||
orc-mapreduce-1.5.2-nohive.jar
|
||||
orc-shims-1.5.2.jar
|
||||
orc-core-1.5.3-nohive.jar
|
||||
orc-mapreduce-1.5.3-nohive.jar
|
||||
orc-shims-1.5.3.jar
|
||||
oro-2.0.8.jar
|
||||
osgi-resource-locator-1.0.1.jar
|
||||
paranamer-2.8.jar
|
||||
|
|
2
pom.xml
2
pom.xml
|
@ -131,7 +131,7 @@
|
|||
<hive.version.short>1.2.1</hive.version.short>
|
||||
<derby.version>10.12.1.1</derby.version>
|
||||
<parquet.version>1.10.0</parquet.version>
|
||||
<orc.version>1.5.2</orc.version>
|
||||
<orc.version>1.5.3</orc.version>
|
||||
<orc.classifier>nohive</orc.classifier>
|
||||
<hive.parquet.version>1.6.0</hive.parquet.version>
|
||||
<jetty.version>9.3.24.v20180605</jetty.version>
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration
|
|||
import org.apache.hadoop.fs.Path
|
||||
import org.apache.orc.OrcConf.COMPRESS
|
||||
import org.apache.orc.OrcFile
|
||||
import org.apache.orc.OrcProto.ColumnEncoding.Kind.{DICTIONARY_V2, DIRECT, DIRECT_V2}
|
||||
import org.apache.orc.OrcProto.Stream.Kind
|
||||
import org.apache.orc.impl.RecordReaderImpl
|
||||
import org.scalatest.BeforeAndAfterAll
|
||||
|
@ -115,6 +116,76 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll {
|
|||
}
|
||||
}
|
||||
|
||||
protected def testSelectiveDictionaryEncoding(isSelective: Boolean) {
|
||||
val tableName = "orcTable"
|
||||
|
||||
withTempDir { dir =>
|
||||
withTable(tableName) {
|
||||
val sqlStatement = orcImp match {
|
||||
case "native" =>
|
||||
s"""
|
||||
|CREATE TABLE $tableName (zipcode STRING, uniqColumn STRING, value DOUBLE)
|
||||
|USING ORC
|
||||
|OPTIONS (
|
||||
| path '${dir.toURI}',
|
||||
| orc.dictionary.key.threshold '1.0',
|
||||
| orc.column.encoding.direct 'uniqColumn'
|
||||
|)
|
||||
""".stripMargin
|
||||
case "hive" =>
|
||||
s"""
|
||||
|CREATE TABLE $tableName (zipcode STRING, uniqColumn STRING, value DOUBLE)
|
||||
|STORED AS ORC
|
||||
|LOCATION '${dir.toURI}'
|
||||
|TBLPROPERTIES (
|
||||
| orc.dictionary.key.threshold '1.0',
|
||||
| hive.exec.orc.dictionary.key.size.threshold '1.0',
|
||||
| orc.column.encoding.direct 'uniqColumn'
|
||||
|)
|
||||
""".stripMargin
|
||||
case impl =>
|
||||
throw new UnsupportedOperationException(s"Unknown ORC implementation: $impl")
|
||||
}
|
||||
|
||||
sql(sqlStatement)
|
||||
sql(s"INSERT INTO $tableName VALUES ('94086', 'random-uuid-string', 0.0)")
|
||||
|
||||
val partFiles = dir.listFiles()
|
||||
.filter(f => f.isFile && !f.getName.startsWith(".") && !f.getName.startsWith("_"))
|
||||
assert(partFiles.length === 1)
|
||||
|
||||
val orcFilePath = new Path(partFiles.head.getAbsolutePath)
|
||||
val readerOptions = OrcFile.readerOptions(new Configuration())
|
||||
val reader = OrcFile.createReader(orcFilePath, readerOptions)
|
||||
var recordReader: RecordReaderImpl = null
|
||||
try {
|
||||
recordReader = reader.rows.asInstanceOf[RecordReaderImpl]
|
||||
|
||||
// Check the kind
|
||||
val stripe = recordReader.readStripeFooter(reader.getStripes.get(0))
|
||||
|
||||
// The encodings are divided into direct or dictionary-based categories and
|
||||
// further refined as to whether they use RLE v1 or v2. RLE v1 is used by
|
||||
// Hive 0.11 and RLE v2 is introduced in Hive 0.12 ORC with more improvements.
|
||||
// For more details, see https://orc.apache.org/specification/
|
||||
assert(stripe.getColumns(1).getKind === DICTIONARY_V2)
|
||||
if (isSelective) {
|
||||
assert(stripe.getColumns(2).getKind === DIRECT_V2)
|
||||
} else {
|
||||
assert(stripe.getColumns(2).getKind === DICTIONARY_V2)
|
||||
}
|
||||
// Floating point types are stored with DIRECT encoding in IEEE 754 floating
|
||||
// point bit layout.
|
||||
assert(stripe.getColumns(3).getKind === DIRECT)
|
||||
} finally {
|
||||
if (recordReader != null) {
|
||||
recordReader.close()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("create temporary orc table") {
|
||||
checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_source"), Row(10))
|
||||
|
||||
|
@ -284,4 +355,8 @@ class OrcSourceSuite extends OrcSuite with SharedSQLContext {
|
|||
test("Check BloomFilter creation") {
|
||||
testBloomFilterCreation(Kind.BLOOM_FILTER_UTF8) // After ORC-101
|
||||
}
|
||||
|
||||
test("Enforce direct encoding column-wise selectively") {
|
||||
testSelectiveDictionaryEncoding(isSelective = true)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -182,4 +182,12 @@ class HiveOrcSourceSuite extends OrcSuite with TestHiveSingleton {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("Enforce direct encoding column-wise selectively") {
|
||||
Seq(true, false).foreach { convertMetastore =>
|
||||
withSQLConf(HiveUtils.CONVERT_METASTORE_ORC.key -> s"$convertMetastore") {
|
||||
testSelectiveDictionaryEncoding(isSelective = false)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue