Merge pull request #129 from mesos/rxin
Force serialize/deserialize task results in local execution mode.
This commit is contained in:
commit
32a4f4623c
|
@ -46,9 +46,15 @@ private class LocalScheduler(threads: Int, maxFailures: Int) extends DAGSchedule
|
|||
idInJob, bytes.size, timeTaken))
|
||||
val deserializedTask = ser.deserialize[Task[_]](bytes, currentThread.getContextClassLoader)
|
||||
val result: Any = deserializedTask.run(attemptId)
|
||||
|
||||
// Serialize and deserialize the result to emulate what the mesos
|
||||
// executor does. This is useful to catch serialization errors early
|
||||
// on in development (so when users move their local Spark programs
|
||||
// to the cluster, they don't get surprised by serialization errors).
|
||||
val resultToReturn = ser.deserialize[Any](ser.serialize(result))
|
||||
val accumUpdates = Accumulators.values
|
||||
logInfo("Finished task " + idInJob)
|
||||
taskEnded(task, Success, result, accumUpdates)
|
||||
taskEnded(task, Success, resultToReturn, accumUpdates)
|
||||
} catch {
|
||||
case t: Throwable => {
|
||||
logError("Exception in task " + idInJob, t)
|
||||
|
|
|
@ -65,5 +65,21 @@ class FailureSuite extends FunSuite {
|
|||
FailureSuiteState.clear()
|
||||
}
|
||||
|
||||
test("failure because task results are not serializable") {
|
||||
val sc = new SparkContext("local[1,1]", "test")
|
||||
val results = sc.makeRDD(1 to 3).map(x => new NonSerializable)
|
||||
|
||||
val thrown = intercept[spark.SparkException] {
|
||||
results.collect()
|
||||
}
|
||||
assert(thrown.getClass === classOf[spark.SparkException])
|
||||
assert(thrown.getMessage.contains("NotSerializableException"))
|
||||
|
||||
sc.stop()
|
||||
FailureSuiteState.clear()
|
||||
}
|
||||
|
||||
// TODO: Need to add tests with shuffle fetch failures.
|
||||
}
|
||||
|
||||
|
||||
|
|
Loading…
Reference in a new issue