[SPARK-3747] TaskResultGetter could incorrectly abort a stage if it cannot get result for a specific task
Author: Reynold Xin <rxin@apache.org> Closes #2599 from rxin/SPARK-3747 and squashes the following commits: a74c04d [Reynold Xin] Added a line of comment explaining NonFatal 0e8d44c [Reynold Xin] [SPARK-3747] TaskResultGetter could incorrectly abort a stage if it cannot get result for a specific task
This commit is contained in:
parent
c5414b6818
commit
eb43043f41
|
@ -19,6 +19,8 @@ package org.apache.spark.scheduler
|
||||||
|
|
||||||
import java.nio.ByteBuffer
|
import java.nio.ByteBuffer
|
||||||
|
|
||||||
|
import scala.util.control.NonFatal
|
||||||
|
|
||||||
import org.apache.spark._
|
import org.apache.spark._
|
||||||
import org.apache.spark.TaskState.TaskState
|
import org.apache.spark.TaskState.TaskState
|
||||||
import org.apache.spark.serializer.SerializerInstance
|
import org.apache.spark.serializer.SerializerInstance
|
||||||
|
@ -32,7 +34,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
|
||||||
|
|
||||||
private val THREADS = sparkEnv.conf.getInt("spark.resultGetter.threads", 4)
|
private val THREADS = sparkEnv.conf.getInt("spark.resultGetter.threads", 4)
|
||||||
private val getTaskResultExecutor = Utils.newDaemonFixedThreadPool(
|
private val getTaskResultExecutor = Utils.newDaemonFixedThreadPool(
|
||||||
THREADS, "Result resolver thread")
|
THREADS, "task-result-getter")
|
||||||
|
|
||||||
protected val serializer = new ThreadLocal[SerializerInstance] {
|
protected val serializer = new ThreadLocal[SerializerInstance] {
|
||||||
override def initialValue(): SerializerInstance = {
|
override def initialValue(): SerializerInstance = {
|
||||||
|
@ -70,7 +72,8 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
|
||||||
case cnf: ClassNotFoundException =>
|
case cnf: ClassNotFoundException =>
|
||||||
val loader = Thread.currentThread.getContextClassLoader
|
val loader = Thread.currentThread.getContextClassLoader
|
||||||
taskSetManager.abort("ClassNotFound with classloader: " + loader)
|
taskSetManager.abort("ClassNotFound with classloader: " + loader)
|
||||||
case ex: Exception =>
|
// Matching NonFatal so we don't catch the ControlThrowable from the "return" above.
|
||||||
|
case NonFatal(ex) =>
|
||||||
logError("Exception while getting task result", ex)
|
logError("Exception while getting task result", ex)
|
||||||
taskSetManager.abort("Exception while getting task result: %s".format(ex))
|
taskSetManager.abort("Exception while getting task result: %s".format(ex))
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue