[SPARK-7989] [CORE] [TESTS] Fix flaky tests in ExternalShuffleServiceSuite and SparkListenerWithClusterSuite
The flaky tests in ExternalShuffleServiceSuite and SparkListenerWithClusterSuite will fail if there are not enough executors up before running the jobs.
This PR adds `JobProgressListener.waitUntilExecutorsUp`. The tests for the cluster mode can use it to wait until the expected executors are up.
Author: zsxwing <zsxwing@gmail.com>
Closes #6546 from zsxwing/SPARK-7989 and squashes the following commits:
5560e09 [zsxwing] Fix a typo
3b69840 [zsxwing] Fix flaky tests in ExternalShuffleServiceSuite and SparkListenerWithClusterSuite
(cherry picked from commit f27134782e
)
Signed-off-by: Andrew Or <andrew@databricks.com>
Conflicts:
core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala
This commit is contained in:
parent
306837e4e3
commit
7e46ea0228
|
@ -17,8 +17,12 @@
|
|||
|
||||
package org.apache.spark.ui.jobs
|
||||
|
||||
import java.util.concurrent.TimeoutException
|
||||
|
||||
import scala.collection.mutable.{HashMap, HashSet, ListBuffer}
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting
|
||||
|
||||
import org.apache.spark._
|
||||
import org.apache.spark.annotation.DeveloperApi
|
||||
import org.apache.spark.executor.TaskMetrics
|
||||
|
@ -526,4 +530,30 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
|
|||
override def onApplicationStart(appStarted: SparkListenerApplicationStart) {
|
||||
startTime = appStarted.time
|
||||
}
|
||||
|
||||
/**
|
||||
* For testing only. Wait until at least `numExecutors` executors are up, or throw
|
||||
* `TimeoutException` if the waiting time elapsed before `numExecutors` executors up.
|
||||
*
|
||||
* @param numExecutors the number of executors to wait at least
|
||||
* @param timeout time to wait in milliseconds
|
||||
*/
|
||||
@VisibleForTesting
|
||||
private[spark] def waitUntilExecutorsUp(numExecutors: Int, timeout: Long): Unit = {
|
||||
val finishTime = System.currentTimeMillis() + timeout
|
||||
while (System.currentTimeMillis() < finishTime) {
|
||||
val numBlockManagers = synchronized {
|
||||
blockManagerIds.size
|
||||
}
|
||||
if (numBlockManagers >= numExecutors + 1) {
|
||||
// Need to count the block manager in driver
|
||||
return
|
||||
}
|
||||
// Sleep rather than using wait/notify, because this is used only for testing and wait/notify
|
||||
// add overhead in the general case.
|
||||
Thread.sleep(10)
|
||||
}
|
||||
throw new TimeoutException(
|
||||
s"Can't find $numExecutors executors before $timeout milliseconds elapsed")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -55,6 +55,14 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll {
|
|||
sc.env.blockManager.externalShuffleServiceEnabled should equal(true)
|
||||
sc.env.blockManager.shuffleClient.getClass should equal(classOf[ExternalShuffleClient])
|
||||
|
||||
// In a slow machine, one slave may register hundreds of milliseconds ahead of the other one.
|
||||
// If we don't wait for all salves, it's possible that only one executor runs all jobs. Then
|
||||
// all shuffle blocks will be in this executor, ShuffleBlockFetcherIterator will directly fetch
|
||||
// local blocks from the local BlockManager and won't send requests to ExternalShuffleService.
|
||||
// In this case, we won't receive FetchFailed. And it will make this test fail.
|
||||
// Therefore, we should wait until all salves are up
|
||||
sc.jobProgressListener.waitUntilExecutorsUp(2, 10000)
|
||||
|
||||
val rdd = sc.parallelize(0 until 1000, 10).map(i => (i, 1)).reduceByKey(_ + _)
|
||||
|
||||
rdd.count()
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
|
||||
package org.apache.spark.broadcast
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import scala.util.Random
|
||||
|
||||
import org.scalatest.{Assertions, FunSuite}
|
||||
|
@ -312,13 +311,7 @@ class BroadcastSuite extends FunSuite with LocalSparkContext {
|
|||
val _sc =
|
||||
new SparkContext("local-cluster[%d, 1, 512]".format(numSlaves), "test", broadcastConf)
|
||||
// Wait until all salves are up
|
||||
eventually(timeout(10.seconds), interval(10.milliseconds)) {
|
||||
_sc.jobProgressListener.synchronized {
|
||||
val numBlockManagers = _sc.jobProgressListener.blockManagerIds.size
|
||||
assert(numBlockManagers == numSlaves + 1,
|
||||
s"Expect ${numSlaves + 1} block managers, but was ${numBlockManagers}")
|
||||
}
|
||||
}
|
||||
_sc.jobProgressListener.waitUntilExecutorsUp(numSlaves, 10000)
|
||||
_sc
|
||||
} else {
|
||||
new SparkContext("local", "test", broadcastConf)
|
||||
|
|
|
@ -17,12 +17,12 @@
|
|||
|
||||
package org.apache.spark.scheduler
|
||||
|
||||
import org.apache.spark.scheduler.cluster.ExecutorInfo
|
||||
import org.apache.spark.{SparkContext, LocalSparkContext}
|
||||
import scala.collection.mutable
|
||||
|
||||
import org.scalatest.{FunSuite, BeforeAndAfter, BeforeAndAfterAll}
|
||||
|
||||
import scala.collection.mutable
|
||||
import org.apache.spark.{LocalSparkContext, SparkContext, SparkFunSuite}
|
||||
import org.apache.spark.scheduler.cluster.ExecutorInfo
|
||||
|
||||
/**
|
||||
* Unit tests for SparkListener that require a local cluster.
|
||||
|
@ -41,6 +41,10 @@ class SparkListenerWithClusterSuite extends FunSuite with LocalSparkContext
|
|||
val listener = new SaveExecutorInfo
|
||||
sc.addSparkListener(listener)
|
||||
|
||||
// This test will check if the number of executors received by "SparkListener" is same as the
|
||||
// number of all executors, so we need to wait until all executors are up
|
||||
sc.jobProgressListener.waitUntilExecutorsUp(2, 10000)
|
||||
|
||||
val rdd1 = sc.parallelize(1 to 100, 4)
|
||||
val rdd2 = rdd1.map(_.toString)
|
||||
rdd2.setName("Target RDD")
|
||||
|
|
Loading…
Reference in a new issue