[SPARK-22557][TEST] Use ThreadSignaler explicitly

## What changes were proposed in this pull request?

ScalaTest 3.0 uses an implicit `Signaler`. This PR makes it sure all Spark tests uses `ThreadSignaler` explicitly which has the same default behavior of interrupting a thread on the JVM like ScalaTest 2.2.x. This will reduce potential flakiness.

## How was this patch tested?

This is testsuite-only update. This should passes the Jenkins tests.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #19784 from dongjoon-hyun/use_thread_signaler.
This commit is contained in:
Dongjoon Hyun 2017-11-20 13:32:01 +09:00 committed by hyukjinkwon
parent d54bfec2e0
commit b10837ab1a
15 changed files with 68 additions and 23 deletions

View file

@ -18,7 +18,7 @@
package org.apache.spark
import org.scalatest.Matchers
import org.scalatest.concurrent.TimeLimits._
import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
import org.scalatest.time.{Millis, Span}
import org.apache.spark.security.EncryptionFunSuite
@ -30,7 +30,10 @@ class NotSerializableExn(val notSer: NotSerializableClass) extends Throwable() {
class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContext
with EncryptionFunSuite {
with EncryptionFunSuite with TimeLimits {
// Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x
implicit val defaultSignaler: Signaler = ThreadSignaler
val clusterUrl = "local-cluster[2,1,1024]"

View file

@ -19,7 +19,7 @@ package org.apache.spark
import java.io.File
import org.scalatest.concurrent.TimeLimits
import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
import org.scalatest.prop.TableDrivenPropertyChecks._
import org.scalatest.time.SpanSugar._
@ -27,6 +27,9 @@ import org.apache.spark.util.Utils
class DriverSuite extends SparkFunSuite with TimeLimits {
// Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x
implicit val defaultSignaler: Signaler = ThreadSignaler
ignore("driver should exit after finishing without cleanup (SPARK-530)") {
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
val masters = Table("master", "local", "local-cluster[2,1,1024]")

View file

@ -17,10 +17,14 @@
package org.apache.spark
import org.scalatest.concurrent.TimeLimits._
import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
import org.scalatest.time.{Millis, Span}
class UnpersistSuite extends SparkFunSuite with LocalSparkContext {
class UnpersistSuite extends SparkFunSuite with LocalSparkContext with TimeLimits {
// Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x
implicit val defaultSignaler: Signaler = ThreadSignaler
test("unpersist RDD") {
sc = new SparkContext("local", "test")
val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache()

View file

@ -31,7 +31,7 @@ import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FSDataInputStream, Path}
import org.scalatest.{BeforeAndAfterEach, Matchers}
import org.scalatest.concurrent.TimeLimits
import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
import org.scalatest.time.SpanSugar._
import org.apache.spark._
@ -102,6 +102,9 @@ class SparkSubmitSuite
import SparkSubmitSuite._
// Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x
implicit val defaultSignaler: Signaler = ThreadSignaler
override def beforeEach() {
super.beforeEach()
System.setProperty("spark.testing", "true")
@ -1016,6 +1019,10 @@ class SparkSubmitSuite
}
object SparkSubmitSuite extends SparkFunSuite with TimeLimits {
// Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x
implicit val defaultSignaler: Signaler = ThreadSignaler
// NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
def runSparkSubmit(args: Seq[String], root: String = ".."): Unit = {
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))

View file

@ -24,7 +24,7 @@ import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.TimeLimits
import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
import org.scalatest.time.SpanSugar._
import org.apache.spark._
@ -34,6 +34,9 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim
@transient private var sc: SparkContext = _
// Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x
implicit val defaultSignaler: Signaler = ThreadSignaler
override def beforeAll() {
super.beforeAll()
sc = new SparkContext("local[2]", "test")

View file

@ -25,7 +25,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
import scala.language.reflectiveCalls
import scala.util.control.NonFatal
import org.scalatest.concurrent.TimeLimits
import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
import org.scalatest.time.SpanSugar._
import org.apache.spark._
@ -102,6 +102,9 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
import DAGSchedulerSuite._
// Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x
implicit val defaultSignaler: Signaler = ThreadSignaler
val conf = new SparkConf
/** Set of TaskSets the DAGScheduler has requested executed. */
val taskSets = scala.collection.mutable.Buffer[TaskSet]()

View file

@ -18,7 +18,7 @@
package org.apache.spark.scheduler
import org.apache.hadoop.mapred.{FileOutputCommitter, TaskAttemptContext}
import org.scalatest.concurrent.TimeLimits
import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
import org.scalatest.time.{Seconds, Span}
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite, TaskContext}
@ -34,6 +34,9 @@ class OutputCommitCoordinatorIntegrationSuite
with LocalSparkContext
with TimeLimits {
// Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x
implicit val defaultSignaler: Signaler = ThreadSignaler
override def beforeAll(): Unit = {
super.beforeAll()
val conf = new SparkConf()

View file

@ -31,8 +31,8 @@ import org.apache.commons.lang3.RandomUtils
import org.mockito.{Matchers => mc}
import org.mockito.Mockito.{mock, times, verify, when}
import org.scalatest._
import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
import org.scalatest.concurrent.Eventually._
import org.scalatest.concurrent.TimeLimits._
import org.apache.spark._
import org.apache.spark.broadcast.BroadcastManager
@ -57,10 +57,13 @@ import org.apache.spark.util.io.ChunkedByteBuffer
class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach
with PrivateMethodTester with LocalSparkContext with ResetSystemProperties
with EncryptionFunSuite {
with EncryptionFunSuite with TimeLimits {
import BlockManagerSuite._
// Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x
implicit val defaultSignaler: Signaler = ThreadSignaler
var conf: SparkConf = null
var store: BlockManager = null
var store2: BlockManager = null
@ -1450,6 +1453,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
private object BlockManagerSuite {
// Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x
implicit val defaultSignaler: Signaler = ThreadSignaler
private implicit class BlockManagerTestUtils(store: BlockManager) {
def dropFromMemoryIfExists(

View file

@ -23,13 +23,16 @@ import scala.collection.JavaConverters._
import scala.concurrent.duration._
import scala.language.postfixOps
import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
import org.scalatest.concurrent.Eventually._
import org.scalatest.concurrent.TimeLimits
import org.apache.spark.SparkFunSuite
class EventLoopSuite extends SparkFunSuite with TimeLimits {
// Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x
implicit val defaultSignaler: Signaler = ThreadSignaler
test("EventLoop") {
val buffer = new ConcurrentLinkedQueue[Int]
val eventLoop = new EventLoop[Int]("test") {

View file

@ -22,16 +22,18 @@ import java.util.concurrent.ConcurrentHashMap
import scala.collection.mutable
import org.eclipse.jetty.util.ConcurrentHashSet
import org.scalatest.concurrent.Eventually
import org.scalatest.concurrent.{Eventually, Signaler, ThreadSignaler, TimeLimits}
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.concurrent.TimeLimits._
import org.scalatest.time.SpanSugar._
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.streaming.ProcessingTime
import org.apache.spark.sql.streaming.util.StreamManualClock
class ProcessingTimeExecutorSuite extends SparkFunSuite {
class ProcessingTimeExecutorSuite extends SparkFunSuite with TimeLimits {
// Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x
implicit val defaultSignaler: Signaler = ThreadSignaler
val timeout = 10.seconds

View file

@ -69,7 +69,9 @@ import org.apache.spark.util.{Clock, SystemClock, Utils}
*/
trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with BeforeAndAfterAll {
// Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x
implicit val defaultSignaler: Signaler = ThreadSignaler
override def afterAll(): Unit = {
super.afterAll()
StateStore.stop() // stop the state store maintenance thread and unload store providers

View file

@ -23,7 +23,7 @@ import java.util.Date
import scala.collection.mutable.ArrayBuffer
import org.scalatest.concurrent.TimeLimits
import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
import org.scalatest.exceptions.TestFailedDueToTimeoutException
import org.scalatest.time.SpanSugar._
@ -33,6 +33,9 @@ import org.apache.spark.util.Utils
trait SparkSubmitTestUtils extends SparkFunSuite with TimeLimits {
// Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x
implicit val defaultSignaler: Signaler = ThreadSignaler
// NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
// This is copied from org.apache.spark.deploy.SparkSubmitSuite
protected def runSparkSubmit(args: Seq[String], sparkHomeOpt: Option[String] = None): Unit = {

View file

@ -38,6 +38,9 @@ import org.apache.spark.util.Utils
/** Testsuite for testing the network receiver behavior */
class ReceiverSuite extends TestSuiteBase with TimeLimits with Serializable {
// Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x
implicit val signaler: Signaler = ThreadSignaler
test("receiver life cycle") {
val receiver = new FakeReceiver
@ -60,8 +63,6 @@ class ReceiverSuite extends TestSuiteBase with TimeLimits with Serializable {
// Verify that the receiver
intercept[Exception] {
// Necessary to make failAfter interrupt awaitTermination() in ScalaTest 3.x
implicit val signaler: Signaler = ThreadSignaler
failAfter(200 millis) {
executingThread.join()
}

View file

@ -44,6 +44,9 @@ import org.apache.spark.util.Utils
class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeLimits with Logging {
// Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x
implicit val signaler: Signaler = ThreadSignaler
val master = "local[2]"
val appName = this.getClass.getSimpleName
val batchDuration = Milliseconds(500)
@ -406,8 +409,6 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL
// test whether awaitTermination() does not exit if not time is given
val exception = intercept[Exception] {
// Necessary to make failAfter interrupt awaitTermination() in ScalaTest 3.x
implicit val signaler: Signaler = ThreadSignaler
failAfter(1000 millis) {
ssc.awaitTermination()
throw new Exception("Did not wait for stop")

View file

@ -24,18 +24,19 @@ import scala.collection.mutable
import org.scalatest.BeforeAndAfter
import org.scalatest.Matchers._
import org.scalatest.concurrent.{Signaler, ThreadSignaler}
import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
import org.scalatest.concurrent.Eventually._
import org.scalatest.concurrent.TimeLimits._
import org.scalatest.time.SpanSugar._
import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.util.ManualClock
class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter {
class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter with TimeLimits {
// Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x
implicit val defaultSignaler: Signaler = ThreadSignaler
private val blockIntervalMs = 10
private val conf = new SparkConf().set("spark.streaming.blockInterval", s"${blockIntervalMs}ms")
@volatile private var blockGenerator: BlockGenerator = null