[SPARK-13408] [CORE] Ignore errors when it's already reported in JobWaiter

## What changes were proposed in this pull request?

`JobWaiter.taskSucceeded` will be called for each task. When `resultHandler` throws an exception, `taskSucceeded` will also throw it for each task. DAGScheduler just catches it and reports it like this:
```Scala
                  try {
                    job.listener.taskSucceeded(rt.outputId, event.result)
                  } catch {
                    case e: Exception =>
                      // TODO: Perhaps we want to mark the resultStage as failed?
                      job.listener.jobFailed(new SparkDriverExecutionException(e))
                  }
```
Therefore `JobWaiter.jobFailed` may be called multiple times.

So `JobWaiter.jobFailed` should use `Promise.tryFailure` instead of `Promise.failure` because the latter one doesn't support calling multiple times.

## How was the this patch tested?

Jenkins tests.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #11280 from zsxwing/SPARK-13408.
This commit is contained in:
Shixiong Zhu 2016-02-19 23:00:08 -08:00 committed by Davies Liu
parent 6624a588c1
commit dfb2ae2f14
2 changed files with 49 additions and 3 deletions

View file

@ -21,6 +21,8 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.{Future, Promise}
import org.apache.spark.Logging
/**
* An object that waits for a DAGScheduler job to complete. As tasks finish, it passes their
* results to the given handler function.
@ -30,7 +32,7 @@ private[spark] class JobWaiter[T](
val jobId: Int,
totalTasks: Int,
resultHandler: (Int, T) => Unit)
extends JobListener {
extends JobListener with Logging {
private val finishedTasks = new AtomicInteger(0)
// If the job is finished, this will be its result. In the case of 0 task jobs (e.g. zero
@ -61,7 +63,10 @@ private[spark] class JobWaiter[T](
}
}
override def jobFailed(exception: Exception): Unit =
jobPromise.failure(exception)
override def jobFailed(exception: Exception): Unit = {
if (!jobPromise.tryFailure(exception)) {
logWarning("Ignore failure", exception)
}
}
}

View file

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler
import scala.util.Failure
import org.apache.spark.SparkFunSuite
class JobWaiterSuite extends SparkFunSuite {
test("call jobFailed multiple times") {
val waiter = new JobWaiter[Int](null, 0, totalTasks = 2, null)
// Should not throw exception if calling jobFailed multiple times
waiter.jobFailed(new RuntimeException("Oops 1"))
waiter.jobFailed(new RuntimeException("Oops 2"))
waiter.jobFailed(new RuntimeException("Oops 3"))
waiter.completionFuture.value match {
case Some(Failure(e)) =>
// We should receive the first exception
assert("Oops 1" === e.getMessage)
case other => fail("Should receiver the first exception but it was " + other)
}
}
}