From a7d3fcd354289c1d0f5c80887b4f33beb3ad96a2 Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Mon, 4 Jan 2021 21:37:26 -0800 Subject: [PATCH] [SPARK-34000][CORE] Fix stageAttemptToNumSpeculativeTasks java.util.NoSuchElementException ### What changes were proposed in this pull request? From below log, Stage 600 could be removed from `stageAttemptToNumSpeculativeTasks` by `onStageCompleted()`, but the speculative task 306.1 in stage 600 threw `NoSuchElementException` when it entered into `onTaskEnd()`. ``` 21/01/04 03:00:32,259 WARN [task-result-getter-2] scheduler.TaskSetManager:69 : Lost task 306.1 in stage 600.0 (TID 283610, hdc49-mcc10-01-0510-4108-039-tess0097.stratus.rno.ebay.com, executor 27): TaskKilled (another attempt succeeded) 21/01/04 03:00:32,259 INFO [task-result-getter-2] scheduler.TaskSetManager:57 : Task 306.1 in stage 600.0 (TID 283610) failed, but the task will not be re-executed (either because the task failed with a shuffle data fetch failure, so the previous stage needs to be re-run, or because a different copy of the task has already succeeded). 21/01/04 03:00:32,259 INFO [task-result-getter-2] cluster.YarnClusterScheduler:57 : Removed TaskSet 600.0, whose tasks have all completed, from pool default 21/01/04 03:00:32,259 INFO [HiveServer2-Handler-Pool: Thread-5853] thriftserver.SparkExecuteStatementOperation:190 : Returning result set with 50 rows from offsets [5378600, 5378650) with 1fe245f8-a7f9-4ec0-bcb5-8cf324cbbb47 21/01/04 03:00:32,260 ERROR [spark-listener-group-executorManagement] scheduler.AsyncEventQueue:94 : Listener ExecutorAllocationListener threw an exception java.util.NoSuchElementException: key not found: Stage 600 (Attempt 0) at scala.collection.MapLike.default(MapLike.scala:235) at scala.collection.MapLike.default$(MapLike.scala:234) at scala.collection.AbstractMap.default(Map.scala:63) at scala.collection.mutable.HashMap.apply(HashMap.scala:69) at org.apache.spark.ExecutorAllocationManager$ExecutorAllocationListener.onTaskEnd(ExecutorAllocationManager.scala:621) at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:45) at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28) at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:38) at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:38) at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:115) at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:99) at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:116) at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:116) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62) at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:102) at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:97) at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1320) at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:97) ``` ### Why are the changes needed? To avoid throwing the java.util.NoSuchElementException ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? This is a protective patch and it's not easy to reproduce in UT due to the event order is not fixed in a async queue. Closes #31025 from LantaoJin/SPARK-34000. Authored-by: LantaoJin Signed-off-by: Dongjoon Hyun --- .../scala/org/apache/spark/ExecutorAllocationManager.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 61ab635842..a83762ff01 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -798,7 +798,11 @@ private[spark] class ExecutorAllocationManager( } if (taskEnd.taskInfo.speculative) { stageAttemptToSpeculativeTaskIndices.get(stageAttempt).foreach {_.remove{taskIndex}} - stageAttemptToNumSpeculativeTasks(stageAttempt) -= 1 + // If the previous task attempt succeeded first and it was the last task in a stage, + // the stage may have been removed before handing this speculative TaskEnd event. + if (stageAttemptToNumSpeculativeTasks.contains(stageAttempt)) { + stageAttemptToNumSpeculativeTasks(stageAttempt) -= 1 + } } taskEnd.reason match {