[SPARK-7655][Core] Deserializing value should not hold the TaskSchedulerImpl lock

We should not call `DirectTaskResult.value` when holding the `TaskSchedulerImpl` lock. It may cost dozens of seconds to deserialize a large object.

Author: zsxwing <zsxwing@gmail.com>

Closes #6195 from zsxwing/SPARK-7655 and squashes the following commits:

21f502e [zsxwing] Add more comments
e25fa88 [zsxwing] Add comments
15010b5 [zsxwing] Deserialize value should not hold the TaskSchedulerImpl lock

(cherry picked from commit 3b6ef2c539)
Signed-off-by: Reynold Xin <rxin@databricks.com>
This commit is contained in:
zsxwing 2015-05-16 21:03:22 -07:00 committed by Reynold Xin
parent bd057f8b55
commit 84949104c9
3 changed files with 31 additions and 2 deletions

View file

@ -40,6 +40,9 @@ class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Map[Long
var metrics: TaskMetrics)
extends TaskResult[T] with Externalizable {
private var valueObjectDeserialized = false
private var valueObject: T = _
def this() = this(null.asInstanceOf[ByteBuffer], null, null)
override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
@ -72,10 +75,26 @@ class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Map[Long
}
}
metrics = in.readObject().asInstanceOf[TaskMetrics]
valueObjectDeserialized = false
}
/**
* When `value()` is called at the first time, it needs to deserialize `valueObject` from
* `valueBytes`. It may cost dozens of seconds for a large instance. So when calling `value` at
* the first time, the caller should avoid to block other threads.
*
* After the first time, `value()` is trivial and just returns the deserialized `valueObject`.
*/
def value(): T = {
val resultSer = SparkEnv.get.serializer.newInstance()
resultSer.deserialize(valueBytes)
if (valueObjectDeserialized) {
valueObject
} else {
// This should not run when holding a lock because it may cost dozens of seconds for a large
// value.
val resultSer = SparkEnv.get.serializer.newInstance()
valueObject = resultSer.deserialize(valueBytes)
valueObjectDeserialized = true
valueObject
}
}
}

View file

@ -54,6 +54,10 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
if (!taskSetManager.canFetchMoreResults(serializedData.limit())) {
return
}
// deserialize "value" without holding any lock so that it won't block other threads.
// We should call it here, so that when it's called again in
// "TaskSetManager.handleSuccessfulTask", it does not need to deserialize the value.
directResult.value()
(directResult, serializedData.limit())
case IndirectTaskResult(blockId, size) =>
if (!taskSetManager.canFetchMoreResults(size)) {

View file

@ -620,6 +620,12 @@ private[spark] class TaskSetManager(
val index = info.index
info.markSuccessful()
removeRunningTask(tid)
// This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
// "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
// "deserialize" the value when holding a lock to avoid blocking other threads. So we call
// "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.
// Note: "result.value()" only deserializes the value when it's called at the first time, so
// here "result.value()" just returns the value and won't block other threads.
sched.dagScheduler.taskEnded(
tasks(index), Success, result.value(), result.accumUpdates, info, result.metrics)
if (!successful(index)) {