[SPARK-29469][SHUFFLE] Avoid retries by RetryingBlockFetcher when ExternalBlockStoreClient is closed
### What changes were proposed in this pull request? When ExternalBlockStoreClient was closed, retries from RetryingBlockFetcher will cause NPE. This proposes to skip retries by RetryingBlockFetcher when ExternalBlockStoreClient is closed. ### Why are the changes needed? When ExternalBlockStoreClient was closed, retries from RetryingBlockFetcher will cause NPE: ``` 2019-10-14 20:06:16 ERROR RetryingBlockFetcher:143 - Exception while beginning fetch of 2 outstanding blocks (after 3 retries) java.lang.NullPointerException at org.apache.spark.network.shuffle.ExternalShuffleClient.lambda$fetchBlocks$0(ExternalShuffleClient.java:100) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:141) at org.apache.spark.network.shuffle.RetryingBlockFetcher.lambda$initiateRetry$0(RetryingBlockFetcher.java:169) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) ``` It was happened after BlockManager and ExternalBlockStoreClient was closed due to previous errors. In this cases, RetryingBlockFetcher does not need to retry. This NPE is harmless for job execution, but is a source of misleading when looking at log. Especially for end-users. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Existing tests. Closes #26115 from viirya/SPARK-29469. Lead-authored-by: Liang-Chi Hsieh <liangchi@uber.com> Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
parent
e00344edc1
commit
93e71e60e6
|
@ -53,7 +53,7 @@ public class ExternalBlockStoreClient extends BlockStoreClient {
|
|||
private final SecretKeyHolder secretKeyHolder;
|
||||
private final long registrationTimeoutMs;
|
||||
|
||||
protected TransportClientFactory clientFactory;
|
||||
protected volatile TransportClientFactory clientFactory;
|
||||
protected String appId;
|
||||
|
||||
/**
|
||||
|
@ -102,9 +102,14 @@ public class ExternalBlockStoreClient extends BlockStoreClient {
|
|||
try {
|
||||
RetryingBlockFetcher.BlockFetchStarter blockFetchStarter =
|
||||
(blockIds1, listener1) -> {
|
||||
TransportClient client = clientFactory.createClient(host, port);
|
||||
new OneForOneBlockFetcher(client, appId, execId,
|
||||
blockIds1, listener1, conf, downloadFileManager).start();
|
||||
// Unless this client is closed.
|
||||
if (clientFactory != null) {
|
||||
TransportClient client = clientFactory.createClient(host, port);
|
||||
new OneForOneBlockFetcher(client, appId, execId,
|
||||
blockIds1, listener1, conf, downloadFileManager).start();
|
||||
} else {
|
||||
logger.info("This clientFactory was closed. Skipping further block fetch retries.");
|
||||
}
|
||||
};
|
||||
|
||||
int maxRetries = conf.maxIORetries();
|
||||
|
|
Loading…
Reference in a new issue