[SPARK-36682][CORE][TEST] Add Hadoop sequence file test for different Hadoop codecs

### What changes were proposed in this pull request?

This patch proposes to add e2e tests for using Hadoop codecs to write sequence files.

### Why are the changes needed?

To improve test coverage.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added tests.

Closes #33924 from viirya/hadoop-seq-test.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
(cherry picked from commit 6745d77818)
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
This commit is contained in:
Liang-Chi Hsieh 2021-09-07 13:19:58 -07:00
parent a41dc4516e
commit acd3856215

View file

@ -28,7 +28,7 @@ import com.google.common.io.Files
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io._
import org.apache.hadoop.io.compress.DefaultCodec
import org.apache.hadoop.io.compress.{BZip2Codec, CompressionCodec, DefaultCodec}
import org.apache.hadoop.mapred.{FileAlreadyExistsException, FileSplit, JobConf, TextInputFormat, TextOutputFormat}
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.{FileSplit => NewFileSplit, TextInputFormat => NewTextInputFormat}
@ -113,25 +113,33 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
}
test("SequenceFile (compressed)") {
sc = new SparkContext("local", "test")
val normalDir = new File(tempDir, "output_normal").getAbsolutePath
val compressedOutputDir = new File(tempDir, "output_compressed").getAbsolutePath
val codec = new DefaultCodec()
def runSequenceFileCodecTest(codec: CompressionCodec, codecName: String): Unit = {
test(s"SequenceFile (compressed) - $codecName") {
sc = new SparkContext("local", "test")
val normalDir = new File(tempDir, "output_normal").getAbsolutePath
val compressedOutputDir = new File(tempDir, "output_compressed").getAbsolutePath
val data = sc.parallelize(Seq.fill(100)("abc"), 1).map(x => (x, x))
data.saveAsSequenceFile(normalDir)
data.saveAsSequenceFile(compressedOutputDir, Some(classOf[DefaultCodec]))
val data = sc.parallelize(Seq.fill(100)("abc"), 1).map(x => (x, x))
data.saveAsSequenceFile(normalDir)
data.saveAsSequenceFile(compressedOutputDir, Some(codec.getClass))
val normalFile = new File(normalDir, "part-00000")
val normalContent = sc.sequenceFile[String, String](normalDir).collect
assert(normalContent === Array.fill(100)(("abc", "abc")))
val normalFile = new File(normalDir, "part-00000")
val normalContent = sc.sequenceFile[String, String](normalDir).collect
assert(normalContent === Array.fill(100)(("abc", "abc")))
val compressedFile = new File(compressedOutputDir, "part-00000" + codec.getDefaultExtension)
val compressedContent = sc.sequenceFile[String, String](compressedOutputDir).collect
assert(compressedContent === Array.fill(100)(("abc", "abc")))
val compressedFile = new File(compressedOutputDir, "part-00000" + codec.getDefaultExtension)
val compressedContent = sc.sequenceFile[String, String](compressedOutputDir).collect
assert(compressedContent === Array.fill(100)(("abc", "abc")))
assert(compressedFile.length < normalFile.length)
assert(compressedFile.length < normalFile.length)
}
}
// Hadoop "gzip" and "zstd" codecs require native library installed for sequence files
// "snappy" and "lz4" codecs do not work due to SPARK-36669 and SPARK-36681.
Seq((new DefaultCodec(), "default"), (new BZip2Codec(), "bzip2")).foreach {
case (codec, codecName) =>
runSequenceFileCodecTest(codec, codecName)
}
test("SequenceFile with writable key") {