[SPARK-31484][CORE] Add stage attempt number to temp checkpoint filename to avoid file already existing exception

### What changes were proposed in this pull request?

To avoid file already existing exception when creating checkpoint file, this PR proposes to add stage attempt number to temporary checkpoint file.

### Why are the changes needed?

On our production clusters, we have seen checkpointing failure. The failed stage can possibly leave partial written checkpoint file, the task of retried stage to write checkpoint file could fail due to`FileAlreadyExistsException` when creating the same file, like
```
org.apache.hadoop.fs.FileAlreadyExistsException: /path_to_checkpoint/rdd-114/.part-03154-attempt-0 for client xxx.xxx.xxx.xxx already exists
org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.startFile(FSDirWriteFileOp.java:359)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2353)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2273)
	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:728)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:413)
	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:447)
	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:989)
	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:851)
	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:794)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2490)

	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:121)
	at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:88)
	at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:270)
	at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1263)
	at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1205)
	at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:473)
	at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:470)
	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:470)
	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:411)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:872)
	at org.apache.spark.rdd.ReliableCheckpointRDD$.writePartitionToCheckpointFile(ReliableCheckpointRDD.scala:204)
```

### Does this PR introduce any user-facing change?

Yes. Users won't see checkpoint file already existing exception after this PR.

### How was this patch tested?

Add unit test.

Closes #28255 from viirya/delete-temp-checkpoint.

Authored-by: Liang-Chi Hsieh <liangchi@uber.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
This commit is contained in:
Liang-Chi Hsieh 2020-04-19 09:11:17 -07:00 committed by Dongjoon Hyun
parent 74aed8cc8b
commit e3ac56c8f4
No known key found for this signature in database
GPG key ID: EDA00CE834F0FC5C
2 changed files with 36 additions and 5 deletions

View file

@ -199,8 +199,8 @@ private[spark] object ReliableCheckpointRDD extends Logging {
val finalOutputName = ReliableCheckpointRDD.checkpointFileName(ctx.partitionId())
val finalOutputPath = new Path(outputDir, finalOutputName)
val tempOutputPath =
new Path(outputDir, s".$finalOutputName-attempt-${ctx.attemptNumber()}")
val tempOutputPath = new Path(outputDir,
s".$finalOutputName-attempt-${ctx.stageAttemptNumber()}-${ctx.attemptNumber()}")
val bufferSize = env.conf.get(BUFFER_SIZE)
@ -218,11 +218,16 @@ private[spark] object ReliableCheckpointRDD extends Logging {
}
val serializer = env.serializer.newInstance()
val serializeStream = serializer.serializeStream(fileOutputStream)
Utils.tryWithSafeFinally {
Utils.tryWithSafeFinallyAndFailureCallbacks {
serializeStream.writeAll(iterator)
} {
} (catchBlock = {
val deleted = fs.delete(tempOutputPath, false)
if (!deleted) {
logInfo(s"Failed to delete tempOutputPath $tempOutputPath.")
}
}, finallyBlock = {
serializeStream.close()
}
})
if (!fs.rename(tempOutputPath, finalOutputPath)) {
if (!fs.exists(finalOutputPath)) {

View file

@ -28,6 +28,7 @@ import org.apache.spark.internal.config.CACHE_CHECKPOINT_PREFERRED_LOCS_EXPIRE_T
import org.apache.spark.internal.config.UI._
import org.apache.spark.io.CompressionCodec
import org.apache.spark.rdd._
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.{BlockId, StorageLevel, TestBlockId}
import org.apache.spark.util.Utils
@ -642,4 +643,29 @@ class CheckpointStorageSuite extends SparkFunSuite with LocalSparkContext {
assert(preferredLoc == checkpointedRDD.cachedPreferredLocations.get(partiton))
}
}
test("SPARK-31484: checkpoint should not fail in retry") {
withTempDir { checkpointDir =>
val conf = new SparkConf()
.set(UI_ENABLED.key, "false")
sc = new SparkContext("local[1]", "test", conf)
sc.setCheckpointDir(checkpointDir.toString)
val rdd = sc.makeRDD(1 to 200, numSlices = 4).repartition(1).mapPartitions { iter =>
iter.map { i =>
if (i > 100 && TaskContext.get().stageAttemptNumber() == 0) {
// throw new SparkException("Make first attemp failed.")
// Throw FetchFailedException to explicitly trigger stage resubmission.
// A normal exception will only trigger task resubmission in the same stage.
throw new FetchFailedException(null, 0, 0L, 0, 0, "Fake")
} else {
i
}
}
}
rdd.checkpoint()
assert(rdd.collect().toSeq === (1 to 200))
// Verify that RDD is checkpointed
assert(rdd.firstParent.isInstanceOf[ReliableCheckpointRDD[_]])
}
}
}