From 7e46ea0228f142f6b384331d62cec8f86e61c9d1 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Wed, 3 Jun 2015 15:04:20 -0700 Subject: [PATCH] [SPARK-7989] [CORE] [TESTS] Fix flaky tests in ExternalShuffleServiceSuite and SparkListenerWithClusterSuite The flaky tests in ExternalShuffleServiceSuite and SparkListenerWithClusterSuite will fail if there are not enough executors up before running the jobs. This PR adds `JobProgressListener.waitUntilExecutorsUp`. The tests for the cluster mode can use it to wait until the expected executors are up. Author: zsxwing Closes #6546 from zsxwing/SPARK-7989 and squashes the following commits: 5560e09 [zsxwing] Fix a typo 3b69840 [zsxwing] Fix flaky tests in ExternalShuffleServiceSuite and SparkListenerWithClusterSuite (cherry picked from commit f27134782ebb61c360330e2d6d5bb1aa02be3fb6) Signed-off-by: Andrew Or Conflicts: core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala --- .../spark/ui/jobs/JobProgressListener.scala | 30 +++++++++++++++++++ .../spark/ExternalShuffleServiceSuite.scala | 8 +++++ .../spark/broadcast/BroadcastSuite.scala | 9 +----- .../SparkListenerWithClusterSuite.scala | 10 +++++-- 4 files changed, 46 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index f39e961772..1d31fce4c6 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -17,8 +17,12 @@ package org.apache.spark.ui.jobs +import java.util.concurrent.TimeoutException + import scala.collection.mutable.{HashMap, HashSet, ListBuffer} +import com.google.common.annotations.VisibleForTesting + import org.apache.spark._ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.executor.TaskMetrics @@ -526,4 +530,30 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { override def onApplicationStart(appStarted: SparkListenerApplicationStart) { startTime = appStarted.time } + + /** + * For testing only. Wait until at least `numExecutors` executors are up, or throw + * `TimeoutException` if the waiting time elapsed before `numExecutors` executors up. + * + * @param numExecutors the number of executors to wait at least + * @param timeout time to wait in milliseconds + */ + @VisibleForTesting + private[spark] def waitUntilExecutorsUp(numExecutors: Int, timeout: Long): Unit = { + val finishTime = System.currentTimeMillis() + timeout + while (System.currentTimeMillis() < finishTime) { + val numBlockManagers = synchronized { + blockManagerIds.size + } + if (numBlockManagers >= numExecutors + 1) { + // Need to count the block manager in driver + return + } + // Sleep rather than using wait/notify, because this is used only for testing and wait/notify + // add overhead in the general case. + Thread.sleep(10) + } + throw new TimeoutException( + s"Can't find $numExecutors executors before $timeout milliseconds elapsed") + } } diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala index bac6fdbcdc..5b127a070c 100644 --- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala @@ -55,6 +55,14 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll { sc.env.blockManager.externalShuffleServiceEnabled should equal(true) sc.env.blockManager.shuffleClient.getClass should equal(classOf[ExternalShuffleClient]) + // In a slow machine, one slave may register hundreds of milliseconds ahead of the other one. + // If we don't wait for all salves, it's possible that only one executor runs all jobs. Then + // all shuffle blocks will be in this executor, ShuffleBlockFetcherIterator will directly fetch + // local blocks from the local BlockManager and won't send requests to ExternalShuffleService. + // In this case, we won't receive FetchFailed. And it will make this test fail. + // Therefore, we should wait until all salves are up + sc.jobProgressListener.waitUntilExecutorsUp(2, 10000) + val rdd = sc.parallelize(0 until 1000, 10).map(i => (i, 1)).reduceByKey(_ + _) rdd.count() diff --git a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala index c38e306b6a..4c85857067 100644 --- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala +++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala @@ -17,7 +17,6 @@ package org.apache.spark.broadcast -import scala.concurrent.duration._ import scala.util.Random import org.scalatest.{Assertions, FunSuite} @@ -312,13 +311,7 @@ class BroadcastSuite extends FunSuite with LocalSparkContext { val _sc = new SparkContext("local-cluster[%d, 1, 512]".format(numSlaves), "test", broadcastConf) // Wait until all salves are up - eventually(timeout(10.seconds), interval(10.milliseconds)) { - _sc.jobProgressListener.synchronized { - val numBlockManagers = _sc.jobProgressListener.blockManagerIds.size - assert(numBlockManagers == numSlaves + 1, - s"Expect ${numSlaves + 1} block managers, but was ${numBlockManagers}") - } - } + _sc.jobProgressListener.waitUntilExecutorsUp(numSlaves, 10000) _sc } else { new SparkContext("local", "test", broadcastConf) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala index 10c43b830b..2ce1b5b535 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala @@ -17,12 +17,12 @@ package org.apache.spark.scheduler -import org.apache.spark.scheduler.cluster.ExecutorInfo -import org.apache.spark.{SparkContext, LocalSparkContext} +import scala.collection.mutable import org.scalatest.{FunSuite, BeforeAndAfter, BeforeAndAfterAll} -import scala.collection.mutable +import org.apache.spark.{LocalSparkContext, SparkContext, SparkFunSuite} +import org.apache.spark.scheduler.cluster.ExecutorInfo /** * Unit tests for SparkListener that require a local cluster. @@ -41,6 +41,10 @@ class SparkListenerWithClusterSuite extends FunSuite with LocalSparkContext val listener = new SaveExecutorInfo sc.addSparkListener(listener) + // This test will check if the number of executors received by "SparkListener" is same as the + // number of all executors, so we need to wait until all executors are up + sc.jobProgressListener.waitUntilExecutorsUp(2, 10000) + val rdd1 = sc.parallelize(1 to 100, 4) val rdd2 = rdd1.map(_.toString) rdd2.setName("Target RDD")