[SPARK-20868][CORE] UnsafeShuffleWriter should verify the position after FileChannel.transferTo
## What changes were proposed in this pull request? Long time ago we fixed a [bug](https://issues.apache.org/jira/browse/SPARK-3948) in shuffle writer about `FileChannel.transferTo`. We were not very confident about that fix, so we added a position check after the writing, try to discover the bug earlier. However this checking is missing in the new `UnsafeShuffleWriter`, this PR adds it. https://issues.apache.org/jira/browse/SPARK-18105 maybe related to that `FileChannel.transferTo` bug, hopefully we can find out the root cause after adding this position check. ## How was this patch tested? N/A Author: Wenchen Fan <wenchen@databricks.com> Closes #18091 from cloud-fan/shuffle.
This commit is contained in:
parent
a97c497045
commit
d9ad78908f
|
@ -422,17 +422,14 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
|
|||
for (int partition = 0; partition < numPartitions; partition++) {
|
||||
for (int i = 0; i < spills.length; i++) {
|
||||
final long partitionLengthInSpill = spills[i].partitionLengths[partition];
|
||||
long bytesToTransfer = partitionLengthInSpill;
|
||||
final FileChannel spillInputChannel = spillInputChannels[i];
|
||||
final long writeStartTime = System.nanoTime();
|
||||
while (bytesToTransfer > 0) {
|
||||
final long actualBytesTransferred = spillInputChannel.transferTo(
|
||||
spillInputChannelPositions[i],
|
||||
bytesToTransfer,
|
||||
mergedFileOutputChannel);
|
||||
spillInputChannelPositions[i] += actualBytesTransferred;
|
||||
bytesToTransfer -= actualBytesTransferred;
|
||||
}
|
||||
Utils.copyFileStreamNIO(
|
||||
spillInputChannel,
|
||||
mergedFileOutputChannel,
|
||||
spillInputChannelPositions[i],
|
||||
partitionLengthInSpill);
|
||||
spillInputChannelPositions[i] += partitionLengthInSpill;
|
||||
writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
|
||||
bytesWrittenToMergedFile += partitionLengthInSpill;
|
||||
partitionLengths[partition] += partitionLengthInSpill;
|
||||
|
|
|
@ -22,7 +22,7 @@ import java.lang.management.{LockInfo, ManagementFactory, MonitorInfo, ThreadInf
|
|||
import java.math.{MathContext, RoundingMode}
|
||||
import java.net._
|
||||
import java.nio.ByteBuffer
|
||||
import java.nio.channels.Channels
|
||||
import java.nio.channels.{Channels, FileChannel}
|
||||
import java.nio.charset.StandardCharsets
|
||||
import java.nio.file.{Files, Paths}
|
||||
import java.util.{Locale, Properties, Random, UUID}
|
||||
|
@ -60,7 +60,6 @@ import org.apache.spark.internal.Logging
|
|||
import org.apache.spark.internal.config._
|
||||
import org.apache.spark.network.util.JavaUtils
|
||||
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
|
||||
import org.apache.spark.util.logging.RollingFileAppender
|
||||
|
||||
/** CallSite represents a place in user code. It can have a short and a long form. */
|
||||
private[spark] case class CallSite(shortForm: String, longForm: String)
|
||||
|
@ -319,41 +318,22 @@ private[spark] object Utils extends Logging {
|
|||
* copying is disabled by default unless explicitly set transferToEnabled as true,
|
||||
* the parameter transferToEnabled should be configured by spark.file.transferTo = [true|false].
|
||||
*/
|
||||
def copyStream(in: InputStream,
|
||||
out: OutputStream,
|
||||
closeStreams: Boolean = false,
|
||||
transferToEnabled: Boolean = false): Long =
|
||||
{
|
||||
var count = 0L
|
||||
def copyStream(
|
||||
in: InputStream,
|
||||
out: OutputStream,
|
||||
closeStreams: Boolean = false,
|
||||
transferToEnabled: Boolean = false): Long = {
|
||||
tryWithSafeFinally {
|
||||
if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream]
|
||||
&& transferToEnabled) {
|
||||
// When both streams are File stream, use transferTo to improve copy performance.
|
||||
val inChannel = in.asInstanceOf[FileInputStream].getChannel()
|
||||
val outChannel = out.asInstanceOf[FileOutputStream].getChannel()
|
||||
val initialPos = outChannel.position()
|
||||
val size = inChannel.size()
|
||||
|
||||
// In case transferTo method transferred less data than we have required.
|
||||
while (count < size) {
|
||||
count += inChannel.transferTo(count, size - count, outChannel)
|
||||
}
|
||||
|
||||
// Check the position after transferTo loop to see if it is in the right position and
|
||||
// give user information if not.
|
||||
// Position will not be increased to the expected length after calling transferTo in
|
||||
// kernel version 2.6.32, this issue can be seen in
|
||||
// https://bugs.openjdk.java.net/browse/JDK-7052359
|
||||
// This will lead to stream corruption issue when using sort-based shuffle (SPARK-3948).
|
||||
val finalPos = outChannel.position()
|
||||
assert(finalPos == initialPos + size,
|
||||
s"""
|
||||
|Current position $finalPos do not equal to expected position ${initialPos + size}
|
||||
|after transferTo, please check your kernel version to see if it is 2.6.32,
|
||||
|this is a kernel bug which will lead to unexpected behavior when using transferTo.
|
||||
|You can set spark.file.transferTo = false to disable this NIO feature.
|
||||
""".stripMargin)
|
||||
copyFileStreamNIO(inChannel, outChannel, 0, size)
|
||||
size
|
||||
} else {
|
||||
var count = 0L
|
||||
val buf = new Array[Byte](8192)
|
||||
var n = 0
|
||||
while (n != -1) {
|
||||
|
@ -363,8 +343,8 @@ private[spark] object Utils extends Logging {
|
|||
count += n
|
||||
}
|
||||
}
|
||||
count
|
||||
}
|
||||
count
|
||||
} {
|
||||
if (closeStreams) {
|
||||
try {
|
||||
|
@ -376,6 +356,37 @@ private[spark] object Utils extends Logging {
|
|||
}
|
||||
}
|
||||
|
||||
def copyFileStreamNIO(
|
||||
input: FileChannel,
|
||||
output: FileChannel,
|
||||
startPosition: Long,
|
||||
bytesToCopy: Long): Unit = {
|
||||
val initialPos = output.position()
|
||||
var count = 0L
|
||||
// In case transferTo method transferred less data than we have required.
|
||||
while (count < bytesToCopy) {
|
||||
count += input.transferTo(count + startPosition, bytesToCopy - count, output)
|
||||
}
|
||||
assert(count == bytesToCopy,
|
||||
s"request to copy $bytesToCopy bytes, but actually copied $count bytes.")
|
||||
|
||||
// Check the position after transferTo loop to see if it is in the right position and
|
||||
// give user information if not.
|
||||
// Position will not be increased to the expected length after calling transferTo in
|
||||
// kernel version 2.6.32, this issue can be seen in
|
||||
// https://bugs.openjdk.java.net/browse/JDK-7052359
|
||||
// This will lead to stream corruption issue when using sort-based shuffle (SPARK-3948).
|
||||
val finalPos = output.position()
|
||||
val expectedPos = initialPos + bytesToCopy
|
||||
assert(finalPos == expectedPos,
|
||||
s"""
|
||||
|Current position $finalPos do not equal to expected position $expectedPos
|
||||
|after transferTo, please check your kernel version to see if it is 2.6.32,
|
||||
|this is a kernel bug which will lead to unexpected behavior when using transferTo.
|
||||
|You can set spark.file.transferTo = false to disable this NIO feature.
|
||||
""".stripMargin)
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct a URI container information used for authentication.
|
||||
* This also sets the default authenticator to properly negotiation the
|
||||
|
|
Loading…
Reference in a new issue