[SPARK-36737][BUILD][CORE][SQL][SS] Upgrade Apache commons-io to 2.11.0 and revert change of SPARK-36456

### What changes were proposed in this pull request?
SPARK-36456 change to use `JavaUtils. closeQuietly` instead of `IOUtils.closeQuietly`, but there is slightly different from the 2 methods in default behavior: swallowing IOException is same, but the former logs it as ERROR while the latter doesn't log by default.

`Apache commons-io` community decided to retain the `IOUtils.closeQuietly` method in the [new version](75f20dca72/src/main/java/org/apache/commons/io/IOUtils.java (L465-L467)) and removed deprecated annotation,  the change has been released in version 2.11.0.

So the change of this pr is to upgrade `Apache commons-io` to 2.11.0 and revert change of SPARK-36456 to maintain original behavior(don't print error log).

### Why are the changes needed?

1. Upgrade `Apache commons-io` to 2.11.0 to use non-deprecated `closeQuietly` API, other changes related to `Apache commons-io are detailed in [commons-io/changes-report](https://commons.apache.org/proper/commons-io/changes-report.html#a2.11.0)

2. Revert change of SPARK-36737 to maintain original `IOUtils.closeQuietly` API behavior(don't print error log).

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass the Jenkins or GitHub Action

Closes #33977 from LuciferYang/upgrade-commons-io.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
This commit is contained in:
yangjie01 2021-09-14 21:16:58 +09:00 committed by Jungtaek Lim
parent f71f37755d
commit 119ddd7e95
12 changed files with 24 additions and 25 deletions

View file

@ -35,6 +35,7 @@ import scala.util.control.NonFatal
import com.codahale.metrics.{MetricRegistry, MetricSet}
import com.google.common.cache.CacheBuilder
import org.apache.commons.io.IOUtils
import org.apache.spark._
import org.apache.spark.errors.SparkCoreErrors
@ -51,7 +52,7 @@ import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle._
import org.apache.spark.network.shuffle.checksum.{Cause, ShuffleChecksumHelper}
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
import org.apache.spark.network.util.{JavaUtils, TransportConf}
import org.apache.spark.network.util.TransportConf
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.scheduler.ExecutorCacheTaskLocation
import org.apache.spark.serializer.{SerializerInstance, SerializerManager}
@ -341,7 +342,7 @@ private[spark] class BlockManager(
false
}
} finally {
JavaUtils.closeQuietly(inputStream)
IOUtils.closeQuietly(inputStream)
}
}

View file

@ -29,6 +29,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
import scala.util.{Failure, Success}
import io.netty.util.internal.OutOfDirectMemoryError
import org.apache.commons.io.IOUtils
import org.roaringbitmap.RoaringBitmap
import org.apache.spark.{MapOutputTracker, TaskContext}
@ -38,7 +39,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.network.shuffle._
import org.apache.spark.network.shuffle.checksum.{Cause, ShuffleChecksumHelper}
import org.apache.spark.network.util.{JavaUtils, NettyUtils, TransportConf}
import org.apache.spark.network.util.{NettyUtils, TransportConf}
import org.apache.spark.shuffle.ShuffleReadMetricsReporter
import org.apache.spark.util.{CompletionIterator, TaskCompletionListener, Utils}
@ -1303,7 +1304,7 @@ private class BufferReleasingInputStream(
val diagnosisResponse = checkedInOpt.map { checkedIn =>
iterator.diagnoseCorruption(checkedIn, address, blockId)
}
JavaUtils.closeQuietly(this)
IOUtils.closeQuietly(this)
// We'd never retry the block whatever the cause is since the block has been
// partially consumed by downstream RDDs.
iterator.throwFetchFailedException(blockId, mapIndex, address, e, diagnosisResponse)

View file

@ -3149,8 +3149,8 @@ private[spark] object Utils extends Logging {
logInfo(s"Unzipped from $dfsZipFile\n\t${files.mkString("\n\t")}")
} finally {
// Close everything no matter what happened
JavaUtils.closeQuietly(in)
JavaUtils.closeQuietly(out)
IOUtils.closeQuietly(in)
IOUtils.closeQuietly(out)
}
files.toSeq
}

View file

@ -25,7 +25,6 @@ import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.internal.config
import org.apache.spark.network.util.JavaUtils
/**
* Continuously appends data from input stream into the given file, and rolls
@ -95,8 +94,8 @@ private[spark] class RollingFileAppender(
gzOutputStream.close()
activeFile.delete()
} finally {
JavaUtils.closeQuietly(inputStream)
JavaUtils.closeQuietly(gzOutputStream)
IOUtils.closeQuietly(inputStream)
IOUtils.closeQuietly(gzOutputStream)
}
} else {
Files.move(activeFile, rolloverFile)

View file

@ -35,7 +35,6 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.util.logging.{FileAppender, RollingFileAppender, SizeBasedRollingPolicy, TimeBasedRollingPolicy}
class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
@ -381,7 +380,7 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
try {
IOUtils.toString(inputStream, StandardCharsets.UTF_8)
} finally {
JavaUtils.closeQuietly(inputStream)
IOUtils.closeQuietly(inputStream)
}
} else {
Files.toString(file, StandardCharsets.UTF_8)

View file

@ -42,7 +42,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.network.util.{ByteUnit, JavaUtils}
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.scheduler.SparkListener
import org.apache.spark.util.io.ChunkedByteBufferInputStream
@ -245,8 +245,8 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
assert(mergedStream.read() === -1)
assert(byteBufferInputStream.chunkedByteBuffer === null)
} finally {
JavaUtils.closeQuietly(mergedStream)
JavaUtils.closeQuietly(in)
IOUtils.closeQuietly(mergedStream)
IOUtils.closeQuietly(in)
}
}
}

View file

@ -35,7 +35,7 @@ commons-compiler/3.0.16//commons-compiler-3.0.16.jar
commons-compress/1.21//commons-compress-1.21.jar
commons-crypto/1.1.0//commons-crypto-1.1.0.jar
commons-dbcp/1.4//commons-dbcp-1.4.jar
commons-io/2.8.0//commons-io-2.8.0.jar
commons-io/2.11.0//commons-io-2.11.0.jar
commons-lang/2.6//commons-lang-2.6.jar
commons-lang3/3.12.0//commons-lang3-3.12.0.jar
commons-logging/1.1.3//commons-logging-1.1.3.jar

View file

@ -173,7 +173,7 @@
<netlib.ludovic.dev.version>2.2.0</netlib.ludovic.dev.version>
<commons-codec.version>1.15</commons-codec.version>
<commons-compress.version>1.21</commons-compress.version>
<commons-io.version>2.8.0</commons-io.version>
<commons-io.version>2.11.0</commons-io.version>
<!-- org.apache.commons/commons-lang/-->
<commons-lang2.version>2.6</commons-lang2.version>
<!-- org.apache.commons/commons-lang3/-->

View file

@ -22,12 +22,12 @@ import java.nio.charset.StandardCharsets
import scala.reflect.ClassTag
import org.apache.commons.io.IOUtils
import org.apache.hadoop.fs._
import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization
import org.apache.spark.internal.Logging
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.errors.QueryExecutionErrors
@ -147,7 +147,7 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
throw new IllegalStateException(
s"Failed to read log file $batchMetadataFile. ${ise.getMessage}", ise)
} finally {
JavaUtils.closeQuietly(input)
IOUtils.closeQuietly(input)
}
} else {
throw QueryExecutionErrors.batchMetadataFileNotFoundError(batchMetadataFile)

View file

@ -22,13 +22,13 @@ import java.nio.charset.StandardCharsets
import scala.util.control.NonFatal
import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileAlreadyExistsException, FSDataInputStream, Path}
import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization
import org.apache.spark.internal.Logging
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream
@ -63,7 +63,7 @@ object StreamMetadata extends Logging {
logError(s"Error reading stream metadata from $metadataFile", e)
throw e
} finally {
JavaUtils.closeQuietly(input)
IOUtils.closeQuietly(input)
}
} else None
}

View file

@ -27,13 +27,13 @@ import scala.collection.mutable
import scala.util.control.NonFatal
import com.google.common.io.ByteStreams
import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.spark.{SparkConf, SparkEnv}
import org.apache.spark.internal.Logging
import org.apache.spark.io.CompressionCodec
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.streaming.CheckpointFileManager
@ -542,7 +542,7 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
rawStream: CancellableFSDataOutputStream): Unit = {
try {
if (rawStream != null) rawStream.cancel()
JavaUtils.closeQuietly(compressedStream)
IOUtils.closeQuietly(compressedStream)
} catch {
case e: FSError if e.getCause.isInstanceOf[IOException] =>
// Closing the compressedStream causes the stream to write/flush flush data into the

View file

@ -38,7 +38,6 @@ import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization
import org.apache.spark.internal.Logging
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.sql.execution.streaming.CheckpointFileManager
import org.apache.spark.util.Utils
@ -459,8 +458,8 @@ class RocksDBFileManager(
throw e
} finally {
// Close everything no matter what happened
JavaUtils.closeQuietly(in)
JavaUtils.closeQuietly(zout)
IOUtils.closeQuietly(in)
IOUtils.closeQuietly(zout)
}
}