From 6eb7559901de37e412707e707fbd6ef9c0ee8874 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Wed, 22 Sep 2021 00:12:29 -0700 Subject: [PATCH] [SPARK-36820][SQL] Disable tests related to LZ4 for Hadoop 2.7 profile ### What changes were proposed in this pull request? Disable tests related to LZ4 in `FileSourceCodecSuite`, `FileSuite` and `ParquetCompressionCodecPrecedenceSuite` when using `hadoop-2.7` profile. ### Why are the changes needed? At the moment, parquet-mr uses LZ4 compression codec provided by Hadoop, and only since HADOOP-17292 (in 3.3.1/3.4.0) the latter added `lz4-java` to remove the restriction that the codec can only be run with native library. As consequence, the test will fail when using `hadoop-2.7` profile. ### Does this PR introduce _any_ user-facing change? No, it's just test. ### How was this patch tested? Existing test Closes #34064 from sunchao/SPARK-36820. Authored-by: Chao Sun Signed-off-by: Liang-Chi Hsieh --- .../scala/org/apache/spark/util/VersionUtils.scala | 7 +++++++ core/src/test/scala/org/apache/spark/FileSuite.scala | 10 ++++++---- .../execution/datasources/FileSourceCodecSuite.scala | 9 +++++++-- .../ParquetCompressionCodecPrecedenceSuite.scala | 6 +++++- 4 files changed, 25 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/VersionUtils.scala b/core/src/main/scala/org/apache/spark/util/VersionUtils.scala index e97d1c9393..362b22b788 100644 --- a/core/src/main/scala/org/apache/spark/util/VersionUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/VersionUtils.scala @@ -17,6 +17,8 @@ package org.apache.spark.util +import org.apache.hadoop.util.VersionInfo + /** * Utilities for working with Spark version strings */ @@ -26,6 +28,11 @@ private[spark] object VersionUtils { private val shortVersionRegex = """^(\d+\.\d+\.\d+)(.*)?$""".r private val majorMinorPatchRegex = """^(\d+)(?:\.(\d+)(?:\.(\d+)(?:[.-].*)?)?)?$""".r + /** + * Whether the Hadoop version used by Spark is 3.x + */ + def isHadoop3: Boolean = majorVersion(VersionInfo.getVersion) == 3 + /** * Given a Spark version string, return the major version number. * E.g., for 2.0.1-SNAPSHOT, return 2. diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index 12022846fe..f1f2b4fc70 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -38,7 +38,7 @@ import org.apache.spark.internal.config._ import org.apache.spark.rdd.{HadoopRDD, NewHadoopRDD} import org.apache.spark.serializer.KryoSerializer import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.Utils +import org.apache.spark.util.{Utils, VersionUtils} class FileSuite extends SparkFunSuite with LocalSparkContext { var tempDir: File = _ @@ -137,9 +137,11 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { // Hadoop "gzip" and "zstd" codecs require native library installed for sequence files // "snappy" codec does not work due to SPARK-36681. - Seq((new DefaultCodec(), "default"), (new BZip2Codec(), "bzip2"), (new Lz4Codec(), "lz4")) - .foreach { case (codec, codecName) => - runSequenceFileCodecTest(codec, codecName) + val codecs = Seq((new DefaultCodec(), "default"), (new BZip2Codec(), "bzip2")) ++ { + if (VersionUtils.isHadoop3) Seq((new Lz4Codec(), "lz4")) else Seq() + } + codecs.foreach { case (codec, codecName) => + runSequenceFileCodecTest(codec, codecName) } test("SequenceFile with writable key") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala index ac1fd1c0b1..fdb698f582 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql.QueryTest import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} +import org.apache.spark.util.VersionUtils trait FileSourceCodecSuite extends QueryTest with SQLTestUtils with SharedSparkSession { @@ -58,8 +59,12 @@ class ParquetCodecSuite extends FileSourceCodecSuite { // Exclude "lzo" because it is GPL-licenced so not included in Hadoop. // Exclude "brotli" because the com.github.rdblue:brotli-codec dependency is not available // on Maven Central. - override protected def availableCodecs: Seq[String] = - Seq("none", "uncompressed", "snappy", "lz4", "gzip", "zstd") + override protected def availableCodecs: Seq[String] = { + Seq("none", "uncompressed", "snappy", "gzip", "zstd") ++ { + // Exclude "lz4" for Hadoop 2.x profile since the lz4-java support is only in 3.x + if (VersionUtils.isHadoop3) Seq("lz4") else Seq() + } + } } class OrcCodecSuite extends FileSourceCodecSuite { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala index 417d59125c..7e6dae9a66 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala @@ -26,6 +26,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.util.VersionUtils class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with SharedSparkSession { test("Test `spark.sql.parquet.compression.codec` config") { @@ -105,7 +106,10 @@ class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with SharedSpar test("Create parquet table with compression") { Seq(true, false).foreach { isPartitioned => - Seq("UNCOMPRESSED", "SNAPPY", "GZIP", "LZ4", "ZSTD").foreach { compressionCodec => + val codecs = Seq("UNCOMPRESSED", "SNAPPY", "GZIP", "ZSTD") ++ { + if (VersionUtils.isHadoop3) Seq("LZ4") else Seq() + } + codecs.foreach { compressionCodec => checkCompressionCodec(compressionCodec, isPartitioned) } }