spark-instrumented-optimizer/core
Liang-Chi Hsieh e3ac56c8f4
[SPARK-31484][CORE] Add stage attempt number to temp checkpoint filename to avoid file already existing exception
### What changes were proposed in this pull request?

To avoid file already existing exception when creating checkpoint file, this PR proposes to add stage attempt number to temporary checkpoint file.

### Why are the changes needed?

On our production clusters, we have seen checkpointing failure. The failed stage can possibly leave partial written checkpoint file, the task of retried stage to write checkpoint file could fail due to`FileAlreadyExistsException` when creating the same file, like
```
org.apache.hadoop.fs.FileAlreadyExistsException: /path_to_checkpoint/rdd-114/.part-03154-attempt-0 for client xxx.xxx.xxx.xxx already exists
org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.startFile(FSDirWriteFileOp.java:359)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2353)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2273)
	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:728)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:413)
	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:447)
	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:989)
	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:851)
	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:794)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2490)

	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:121)
	at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:88)
	at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:270)
	at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1263)
	at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1205)
	at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:473)
	at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:470)
	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:470)
	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:411)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:872)
	at org.apache.spark.rdd.ReliableCheckpointRDD$.writePartitionToCheckpointFile(ReliableCheckpointRDD.scala:204)
```

### Does this PR introduce any user-facing change?

Yes. Users won't see checkpoint file already existing exception after this PR.

### How was this patch tested?

Add unit test.

Closes #28255 from viirya/delete-temp-checkpoint.

Authored-by: Liang-Chi Hsieh <liangchi@uber.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-04-19 09:11:17 -07:00
..
benchmarks [SPARK-29576][CORE] Use Spark's CompressionCodec for Ser/Deser of MapOutputStatus 2019-10-23 18:17:37 -07:00
src [SPARK-31484][CORE] Add stage attempt number to temp checkpoint filename to avoid file already existing exception 2020-04-19 09:11:17 -07:00
pom.xml [SPARK-30950][BUILD] Setting version to 3.1.0-SNAPSHOT 2020-02-25 19:44:31 -08:00