[SPARK-36820][SQL] Disable tests related to LZ4 for Hadoop 2.7 profile

### What changes were proposed in this pull request?

Disable tests related to LZ4 in `FileSourceCodecSuite`, `FileSuite` and `ParquetCompressionCodecPrecedenceSuite` when using `hadoop-2.7` profile.
### Why are the changes needed?

At the moment, parquet-mr uses LZ4 compression codec provided by Hadoop, and only since HADOOP-17292 (in 3.3.1/3.4.0) the latter added `lz4-java` to remove the restriction that the codec can only be run with native library. As consequence, the test will fail when using `hadoop-2.7` profile.

### Does this PR introduce _any_ user-facing change?

No, it's just test.

### How was this patch tested?

Existing test

Closes #34064 from sunchao/SPARK-36820.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
This commit is contained in:
Chao Sun 2021-09-22 00:12:29 -07:00 committed by Liang-Chi Hsieh
parent 079a9c5292
commit 6eb7559901
4 changed files with 25 additions and 7 deletions

View file

@ -17,6 +17,8 @@
package org.apache.spark.util package org.apache.spark.util
import org.apache.hadoop.util.VersionInfo
/** /**
* Utilities for working with Spark version strings * Utilities for working with Spark version strings
*/ */
@ -26,6 +28,11 @@ private[spark] object VersionUtils {
private val shortVersionRegex = """^(\d+\.\d+\.\d+)(.*)?$""".r private val shortVersionRegex = """^(\d+\.\d+\.\d+)(.*)?$""".r
private val majorMinorPatchRegex = """^(\d+)(?:\.(\d+)(?:\.(\d+)(?:[.-].*)?)?)?$""".r private val majorMinorPatchRegex = """^(\d+)(?:\.(\d+)(?:\.(\d+)(?:[.-].*)?)?)?$""".r
/**
* Whether the Hadoop version used by Spark is 3.x
*/
def isHadoop3: Boolean = majorVersion(VersionInfo.getVersion) == 3
/** /**
* Given a Spark version string, return the major version number. * Given a Spark version string, return the major version number.
* E.g., for 2.0.1-SNAPSHOT, return 2. * E.g., for 2.0.1-SNAPSHOT, return 2.

View file

@ -38,7 +38,7 @@ import org.apache.spark.internal.config._
import org.apache.spark.rdd.{HadoopRDD, NewHadoopRDD} import org.apache.spark.rdd.{HadoopRDD, NewHadoopRDD}
import org.apache.spark.serializer.KryoSerializer import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils import org.apache.spark.util.{Utils, VersionUtils}
class FileSuite extends SparkFunSuite with LocalSparkContext { class FileSuite extends SparkFunSuite with LocalSparkContext {
var tempDir: File = _ var tempDir: File = _
@ -137,9 +137,11 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
// Hadoop "gzip" and "zstd" codecs require native library installed for sequence files // Hadoop "gzip" and "zstd" codecs require native library installed for sequence files
// "snappy" codec does not work due to SPARK-36681. // "snappy" codec does not work due to SPARK-36681.
Seq((new DefaultCodec(), "default"), (new BZip2Codec(), "bzip2"), (new Lz4Codec(), "lz4")) val codecs = Seq((new DefaultCodec(), "default"), (new BZip2Codec(), "bzip2")) ++ {
.foreach { case (codec, codecName) => if (VersionUtils.isHadoop3) Seq((new Lz4Codec(), "lz4")) else Seq()
runSequenceFileCodecTest(codec, codecName) }
codecs.foreach { case (codec, codecName) =>
runSequenceFileCodecTest(codec, codecName)
} }
test("SequenceFile with writable key") { test("SequenceFile with writable key") {

View file

@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources
import org.apache.spark.sql.QueryTest import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
import org.apache.spark.util.VersionUtils
trait FileSourceCodecSuite extends QueryTest with SQLTestUtils with SharedSparkSession { trait FileSourceCodecSuite extends QueryTest with SQLTestUtils with SharedSparkSession {
@ -58,8 +59,12 @@ class ParquetCodecSuite extends FileSourceCodecSuite {
// Exclude "lzo" because it is GPL-licenced so not included in Hadoop. // Exclude "lzo" because it is GPL-licenced so not included in Hadoop.
// Exclude "brotli" because the com.github.rdblue:brotli-codec dependency is not available // Exclude "brotli" because the com.github.rdblue:brotli-codec dependency is not available
// on Maven Central. // on Maven Central.
override protected def availableCodecs: Seq[String] = override protected def availableCodecs: Seq[String] = {
Seq("none", "uncompressed", "snappy", "lz4", "gzip", "zstd") Seq("none", "uncompressed", "snappy", "gzip", "zstd") ++ {
// Exclude "lz4" for Hadoop 2.x profile since the lz4-java support is only in 3.x
if (VersionUtils.isHadoop3) Seq("lz4") else Seq()
}
}
} }
class OrcCodecSuite extends FileSourceCodecSuite { class OrcCodecSuite extends FileSourceCodecSuite {

View file

@ -26,6 +26,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat
import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.util.VersionUtils
class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with SharedSparkSession { class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with SharedSparkSession {
test("Test `spark.sql.parquet.compression.codec` config") { test("Test `spark.sql.parquet.compression.codec` config") {
@ -105,7 +106,10 @@ class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with SharedSpar
test("Create parquet table with compression") { test("Create parquet table with compression") {
Seq(true, false).foreach { isPartitioned => Seq(true, false).foreach { isPartitioned =>
Seq("UNCOMPRESSED", "SNAPPY", "GZIP", "LZ4", "ZSTD").foreach { compressionCodec => val codecs = Seq("UNCOMPRESSED", "SNAPPY", "GZIP", "ZSTD") ++ {
if (VersionUtils.isHadoop3) Seq("LZ4") else Seq()
}
codecs.foreach { compressionCodec =>
checkCompressionCodec(compressionCodec, isPartitioned) checkCompressionCodec(compressionCodec, isPartitioned)
} }
} }