From acd3856215c9fce44183513a4385e459b6b92b17 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 7 Sep 2021 13:19:58 -0700 Subject: [PATCH] [SPARK-36682][CORE][TEST] Add Hadoop sequence file test for different Hadoop codecs ### What changes were proposed in this pull request? This patch proposes to add e2e tests for using Hadoop codecs to write sequence files. ### Why are the changes needed? To improve test coverage. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added tests. Closes #33924 from viirya/hadoop-seq-test. Authored-by: Liang-Chi Hsieh Signed-off-by: Liang-Chi Hsieh (cherry picked from commit 6745d778184a7ecdff77458d0bca65d3f637affe) Signed-off-by: Liang-Chi Hsieh --- .../scala/org/apache/spark/FileSuite.scala | 40 +++++++++++-------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index f953bf4043..67a9764ee6 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -28,7 +28,7 @@ import com.google.common.io.Files import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io._ -import org.apache.hadoop.io.compress.DefaultCodec +import org.apache.hadoop.io.compress.{BZip2Codec, CompressionCodec, DefaultCodec} import org.apache.hadoop.mapred.{FileAlreadyExistsException, FileSplit, JobConf, TextInputFormat, TextOutputFormat} import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.{FileSplit => NewFileSplit, TextInputFormat => NewTextInputFormat} @@ -113,25 +113,33 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)")) } - test("SequenceFile (compressed)") { - sc = new SparkContext("local", "test") - val normalDir = new File(tempDir, "output_normal").getAbsolutePath - val compressedOutputDir = new File(tempDir, "output_compressed").getAbsolutePath - val codec = new DefaultCodec() + def runSequenceFileCodecTest(codec: CompressionCodec, codecName: String): Unit = { + test(s"SequenceFile (compressed) - $codecName") { + sc = new SparkContext("local", "test") + val normalDir = new File(tempDir, "output_normal").getAbsolutePath + val compressedOutputDir = new File(tempDir, "output_compressed").getAbsolutePath - val data = sc.parallelize(Seq.fill(100)("abc"), 1).map(x => (x, x)) - data.saveAsSequenceFile(normalDir) - data.saveAsSequenceFile(compressedOutputDir, Some(classOf[DefaultCodec])) + val data = sc.parallelize(Seq.fill(100)("abc"), 1).map(x => (x, x)) + data.saveAsSequenceFile(normalDir) + data.saveAsSequenceFile(compressedOutputDir, Some(codec.getClass)) - val normalFile = new File(normalDir, "part-00000") - val normalContent = sc.sequenceFile[String, String](normalDir).collect - assert(normalContent === Array.fill(100)(("abc", "abc"))) + val normalFile = new File(normalDir, "part-00000") + val normalContent = sc.sequenceFile[String, String](normalDir).collect + assert(normalContent === Array.fill(100)(("abc", "abc"))) - val compressedFile = new File(compressedOutputDir, "part-00000" + codec.getDefaultExtension) - val compressedContent = sc.sequenceFile[String, String](compressedOutputDir).collect - assert(compressedContent === Array.fill(100)(("abc", "abc"))) + val compressedFile = new File(compressedOutputDir, "part-00000" + codec.getDefaultExtension) + val compressedContent = sc.sequenceFile[String, String](compressedOutputDir).collect + assert(compressedContent === Array.fill(100)(("abc", "abc"))) - assert(compressedFile.length < normalFile.length) + assert(compressedFile.length < normalFile.length) + } + } + + // Hadoop "gzip" and "zstd" codecs require native library installed for sequence files + // "snappy" and "lz4" codecs do not work due to SPARK-36669 and SPARK-36681. + Seq((new DefaultCodec(), "default"), (new BZip2Codec(), "bzip2")).foreach { + case (codec, codecName) => + runSequenceFileCodecTest(codec, codecName) } test("SequenceFile with writable key") {