[SPARK-27404][CORE][SQL][STREAMING][YARN] Fix build warnings for 3.0: postfixOps edition

## What changes were proposed in this pull request?

Fix build warnings -- see some details below.

But mostly, remove use of postfix syntax where it causes warnings without the `scala.language.postfixOps` import. This is mostly in expressions like "120000 milliseconds". Which, I'd like to simplify to things like "2.minutes" anyway.

## How was this patch tested?

Existing tests.

Closes #24314 from srowen/SPARK-27404.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
This commit is contained in:
Sean Owen 2019-04-11 13:43:44 -05:00
parent 43da473c1c
commit 4ec7f631aa
71 changed files with 458 additions and 459 deletions

View file

@ -46,9 +46,9 @@ import static org.apache.spark.network.shuffle.RetryingBlockFetcher.BlockFetchSt
*/
public class RetryingBlockFetcherSuite {
ManagedBuffer block0 = new NioManagedBuffer(ByteBuffer.wrap(new byte[13]));
ManagedBuffer block1 = new NioManagedBuffer(ByteBuffer.wrap(new byte[7]));
ManagedBuffer block2 = new NioManagedBuffer(ByteBuffer.wrap(new byte[19]));
private final ManagedBuffer block0 = new NioManagedBuffer(ByteBuffer.wrap(new byte[13]));
private final ManagedBuffer block1 = new NioManagedBuffer(ByteBuffer.wrap(new byte[7]));
private final ManagedBuffer block2 = new NioManagedBuffer(ByteBuffer.wrap(new byte[19]));
@Test
public void testNoFailures() throws IOException, InterruptedException {
@ -291,7 +291,7 @@ public class RetryingBlockFetcherSuite {
}
assertNotNull(stub);
stub.when(fetchStarter).createAndStart(any(), anyObject());
stub.when(fetchStarter).createAndStart(any(), any());
String[] blockIdArray = blockIds.toArray(new String[blockIds.size()]);
new RetryingBlockFetcher(conf, fetchStarter, blockIdArray, listener).start();
}

View file

@ -20,7 +20,6 @@ package org.apache.spark
import java.util.{Properties, Timer, TimerTask}
import scala.concurrent.duration._
import scala.language.postfixOps
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.executor.TaskMetrics
@ -122,7 +121,7 @@ class BarrierTaskContext private[spark] (
barrierEpoch),
// Set a fixed timeout for RPC here, so users shall get a SparkException thrown by
// BarrierCoordinator on timeout, instead of RPCTimeoutException from the RPC framework.
timeout = new RpcTimeout(31536000 /* = 3600 * 24 * 365 */ seconds, "barrierTimeout"))
timeout = new RpcTimeout(365.days, "barrierTimeout"))
barrierEpoch += 1
logInfo(s"Task $taskAttemptId from Stage $stageId(Attempt $stageAttemptNumber) finished " +
"global sync successfully, waited for " +

View file

@ -26,7 +26,6 @@ import scala.collection.mutable.ListBuffer
import scala.concurrent.{Future, Promise}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.sys.process._
import org.json4s._
@ -112,7 +111,7 @@ private object FaultToleranceTest extends App with Logging {
assertValidClusterState()
killLeader()
delay(30 seconds)
delay(30.seconds)
assertValidClusterState()
createClient()
assertValidClusterState()
@ -126,12 +125,12 @@ private object FaultToleranceTest extends App with Logging {
killLeader()
addMasters(1)
delay(30 seconds)
delay(30.seconds)
assertValidClusterState()
killLeader()
addMasters(1)
delay(30 seconds)
delay(30.seconds)
assertValidClusterState()
}
@ -156,7 +155,7 @@ private object FaultToleranceTest extends App with Logging {
killLeader()
workers.foreach(_.kill())
workers.clear()
delay(30 seconds)
delay(30.seconds)
addWorkers(2)
assertValidClusterState()
}
@ -174,7 +173,7 @@ private object FaultToleranceTest extends App with Logging {
(1 to 3).foreach { _ =>
killLeader()
delay(30 seconds)
delay(30.seconds)
assertValidClusterState()
assertTrue(getLeader == masters.head)
addMasters(1)
@ -264,7 +263,7 @@ private object FaultToleranceTest extends App with Logging {
}
// Avoid waiting indefinitely (e.g., we could register but get no executors).
assertTrue(ThreadUtils.awaitResult(f, 120 seconds))
assertTrue(ThreadUtils.awaitResult(f, 2.minutes))
}
/**
@ -317,7 +316,7 @@ private object FaultToleranceTest extends App with Logging {
}
try {
assertTrue(ThreadUtils.awaitResult(f, 120 seconds))
assertTrue(ThreadUtils.awaitResult(f, 2.minutes))
} catch {
case e: TimeoutException =>
logError("Master states: " + masters.map(_.state))
@ -421,7 +420,7 @@ private object SparkDocker {
}
dockerCmd.run(ProcessLogger(findIpAndLog _))
val ip = ThreadUtils.awaitResult(ipPromise.future, 30 seconds)
val ip = ThreadUtils.awaitResult(ipPromise.future, 30.seconds)
val dockerId = Docker.getLastProcessId
(ip, dockerId, outFile)
}

View file

@ -52,7 +52,7 @@ private[spark] class RpcTimeout(val duration: FiniteDuration, val timeoutProp: S
*
* @note This can be used in the recover callback of a Future to add to a TimeoutException
* Example:
* val timeout = new RpcTimeout(5 millis, "short timeout")
* val timeout = new RpcTimeout(5.milliseconds, "short timeout")
* Future(throw new TimeoutException).recover(timeout.addMessageIfTimeout)
*/
def addMessageIfTimeout[T]: PartialFunction[Throwable, T] = {

View file

@ -27,7 +27,6 @@ import scala.collection.Map
import scala.collection.mutable.{ArrayStack, HashMap, HashSet}
import scala.concurrent.duration._
import scala.language.existentials
import scala.language.postfixOps
import scala.util.control.NonFatal
import org.apache.commons.lang3.SerializationUtils
@ -270,7 +269,7 @@ private[spark] class DAGScheduler(
listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, accumUpdates,
Some(executorUpdates)))
blockManagerMaster.driverEndpoint.askSync[Boolean](
BlockManagerHeartbeat(blockManagerId), new RpcTimeout(600 seconds, "BlockManagerHeartbeat"))
BlockManagerHeartbeat(blockManagerId), new RpcTimeout(10.minutes, "BlockManagerHeartbeat"))
}
/**

View file

@ -109,12 +109,12 @@ private[spark] object UIUtils extends Logging {
}
}
// if time is more than a year
return s"$yearString $weekString $dayString"
s"$yearString $weekString $dayString"
} catch {
case e: Exception =>
logError("Error converting time to string", e)
// if there is some error, return blank string
return ""
""
}
}
@ -336,7 +336,7 @@ private[spark] object UIUtils extends Logging {
def getHeaderContent(header: String): Seq[Node] = {
if (newlinesInHeader) {
<ul class="unstyled">
{ header.split("\n").map { case t => <li> {t} </li> } }
{ header.split("\n").map(t => <li> {t} </li>) }
</ul>
} else {
Text(header)
@ -446,7 +446,7 @@ private[spark] object UIUtils extends Logging {
* the whole string will rendered as a simple escaped text.
*
* Note: In terms of security, only anchor tags with root relative links are supported. So any
* attempts to embed links outside Spark UI, or other tags like {@code <script>} will cause in
* attempts to embed links outside Spark UI, or other tags like &lt;script&gt; will cause in
* the whole description to be treated as plain text.
*
* @param desc the original job or stage description string, which may contain html tags.
@ -458,7 +458,6 @@ private[spark] object UIUtils extends Logging {
* is true, and an Elem otherwise.
*/
def makeDescription(desc: String, basePathUri: String, plainText: Boolean = false): NodeSeq = {
import scala.language.postfixOps
// If the description can be parsed as HTML and has only relative links, then render
// as HTML, otherwise render as escaped string
@ -468,9 +467,7 @@ private[spark] object UIUtils extends Logging {
// Verify that this has only anchors and span (we are wrapping in span)
val allowedNodeLabels = Set("a", "span", "br")
val illegalNodes = xml \\ "_" filterNot { case node: Node =>
allowedNodeLabels.contains(node.label)
}
val illegalNodes = (xml \\ "_").filterNot(node => allowedNodeLabels.contains(node.label))
if (illegalNodes.nonEmpty) {
throw new IllegalArgumentException(
"Only HTML anchors allowed in job descriptions\n" +
@ -491,8 +488,8 @@ private[spark] object UIUtils extends Logging {
new RewriteRule() {
override def transform(n: Node): Seq[Node] = {
n match {
case e: Elem if e.child isEmpty => Text(e.text)
case e: Elem if e.child nonEmpty => Text(e.child.flatMap(transform).text)
case e: Elem if e.child.isEmpty => Text(e.text)
case e: Elem => Text(e.child.flatMap(transform).text)
case _ => n
}
}
@ -503,7 +500,7 @@ private[spark] object UIUtils extends Logging {
new RewriteRule() {
override def transform(n: Node): Seq[Node] = {
n match {
case e: Elem if e \ "@href" nonEmpty =>
case e: Elem if (e \ "@href").nonEmpty =>
val relativePath = e.attribute("href").get.toString
val fullUri = s"${basePathUri.stripSuffix("/")}/${relativePath.stripPrefix("/")}"
e % Attribute(null, "href", fullUri, Null)

View file

@ -18,7 +18,6 @@
package org.apache.spark
import scala.concurrent.duration._
import scala.language.postfixOps
import org.apache.spark.internal.config._
import org.apache.spark.rdd.{PartitionPruningRDD, RDD}
@ -52,7 +51,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext
)
val error = intercept[SparkException] {
ThreadUtils.awaitResult(futureAction, 5 seconds)
ThreadUtils.awaitResult(futureAction, 5.seconds)
}.getCause.getMessage
assert(error.contains(message))
}

View file

@ -43,7 +43,7 @@ import org.apache.spark.storage._
abstract class ContextCleanerSuiteBase(val shuffleManager: Class[_] = classOf[SortShuffleManager])
extends SparkFunSuite with BeforeAndAfter with LocalSparkContext
{
implicit val defaultTimeout = timeout(10000 millis)
implicit val defaultTimeout = timeout(10.seconds)
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("ContextCleanerSuite")
@ -159,7 +159,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
val preGCTester = new CleanerTester(sc, rddIds = Seq(rdd.id))
runGC()
intercept[Exception] {
preGCTester.assertCleanup()(timeout(1000 millis))
preGCTester.assertCleanup()(timeout(1.second))
}
// Test that GC causes RDD cleanup after dereferencing the RDD
@ -178,7 +178,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
val preGCTester = new CleanerTester(sc, shuffleIds = Seq(0))
runGC()
intercept[Exception] {
preGCTester.assertCleanup()(timeout(1000 millis))
preGCTester.assertCleanup()(timeout(1.second))
}
rdd.count() // Defeat early collection by the JVM
@ -196,7 +196,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
val preGCTester = new CleanerTester(sc, broadcastIds = Seq(broadcast.id))
runGC()
intercept[Exception] {
preGCTester.assertCleanup()(timeout(1000 millis))
preGCTester.assertCleanup()(timeout(1.second))
}
// Test that GC causes broadcast cleanup after dereferencing the broadcast variable
@ -272,7 +272,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
val preGCTester = new CleanerTester(sc, rddIds = Seq(rdd.id))
runGC()
intercept[Exception] {
preGCTester.assertCleanup()(timeout(1000 millis))
preGCTester.assertCleanup()(timeout(1.second))
}
// Test that RDD going out of scope does cause the checkpoint blocks to be cleaned up
@ -294,7 +294,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
val preGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds)
runGC()
intercept[Exception] {
preGCTester.assertCleanup()(timeout(1000 millis))
preGCTester.assertCleanup()(timeout(1.second))
}
// Test that GC triggers the cleanup of all variables after the dereferencing them
@ -334,7 +334,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
val preGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds)
runGC()
intercept[Exception] {
preGCTester.assertCleanup()(timeout(1000 millis))
preGCTester.assertCleanup()(timeout(1.second))
}
// Test that GC triggers the cleanup of all variables after the dereferencing them
@ -408,7 +408,7 @@ class CleanerTester(
/** Assert that all the stuff has been cleaned up */
def assertCleanup()(implicit waitTimeout: PatienceConfiguration.Timeout) {
try {
eventually(waitTimeout, interval(100 millis)) {
eventually(waitTimeout, interval(100.milliseconds)) {
assert(isAllCleanedUp,
"The following resources were not cleaned up:\n" + uncleanedResourcesToString)
}

View file

@ -33,12 +33,12 @@ class DriverSuite extends SparkFunSuite with TimeLimits {
ignore("driver should exit after finishing without cleanup (SPARK-530)") {
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
val masters = Table("master", "local", "local-cluster[2,1,1024]")
forAll(masters) { (master: String) =>
forAll(masters) { master =>
val process = Utils.executeCommand(
Seq(s"$sparkHome/bin/spark-class", "org.apache.spark.DriverWithoutCleanup", master),
new File(sparkHome),
Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome))
failAfter(60 seconds) { process.waitFor() }
failAfter(1.minute) { process.waitFor() }
// Ensure we still kill the process in case it timed out
process.destroy()
}

View file

@ -250,7 +250,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
assert(e.getMessage contains "cancel")
// Once A is cancelled, job B should finish fairly quickly.
assert(ThreadUtils.awaitResult(jobB, 60.seconds) === 100)
assert(ThreadUtils.awaitResult(jobB, 1.minute) === 100)
}
test("task reaper will not kill JVM if spark.task.killTimeout == -1") {
@ -290,7 +290,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
assert(e.getMessage contains "cancel")
// Once A is cancelled, job B should finish fairly quickly.
assert(ThreadUtils.awaitResult(jobB, 60.seconds) === 100)
assert(ThreadUtils.awaitResult(jobB, 1.minute) === 100)
}
test("two jobs sharing the same stage") {

View file

@ -21,7 +21,6 @@ import java.util.concurrent.{Executors, TimeUnit}
import scala.collection.JavaConverters._
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.{Random, Try}
import com.esotericsoftware.kryo.Kryo
@ -279,10 +278,10 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
assert(RpcUtils.retryWaitMs(conf) === 2L)
conf.set("spark.akka.askTimeout", "3")
assert(RpcUtils.askRpcTimeout(conf).duration === (3 seconds))
assert(RpcUtils.askRpcTimeout(conf).duration === 3.seconds)
conf.set("spark.akka.lookupTimeout", "4")
assert(RpcUtils.lookupRpcTimeout(conf).duration === (4 seconds))
assert(RpcUtils.lookupRpcTimeout(conf).duration === 4.seconds)
}
test("SPARK-13727") {

View file

@ -19,7 +19,6 @@ package org.apache.spark
import scala.concurrent.duration._
import scala.language.implicitConversions
import scala.language.postfixOps
import org.scalatest.Matchers
import org.scalatest.concurrent.Eventually._
@ -31,25 +30,25 @@ class StatusTrackerSuite extends SparkFunSuite with Matchers with LocalSparkCont
test("basic status API usage") {
sc = new SparkContext("local", "test", new SparkConf(false))
val jobFuture = sc.parallelize(1 to 10000, 2).map(identity).groupBy(identity).collectAsync()
val jobId: Int = eventually(timeout(10 seconds)) {
val jobId: Int = eventually(timeout(10.seconds)) {
val jobIds = jobFuture.jobIds
jobIds.size should be(1)
jobIds.head
}
val jobInfo = eventually(timeout(10 seconds)) {
val jobInfo = eventually(timeout(10.seconds)) {
sc.statusTracker.getJobInfo(jobId).get
}
jobInfo.status() should not be FAILED
val stageIds = jobInfo.stageIds()
stageIds.size should be(2)
val firstStageInfo = eventually(timeout(10 seconds)) {
val firstStageInfo = eventually(timeout(10.seconds)) {
sc.statusTracker.getStageInfo(stageIds.min).get
}
firstStageInfo.stageId() should be(stageIds.min)
firstStageInfo.currentAttemptId() should be(0)
firstStageInfo.numTasks() should be(2)
eventually(timeout(10 seconds)) {
eventually(timeout(10.seconds)) {
val updatedFirstStageInfo = sc.statusTracker.getStageInfo(stageIds.min).get
updatedFirstStageInfo.numCompletedTasks() should be(2)
updatedFirstStageInfo.numActiveTasks() should be(0)
@ -61,27 +60,27 @@ class StatusTrackerSuite extends SparkFunSuite with Matchers with LocalSparkCont
sc = new SparkContext("local", "test", new SparkConf(false))
// Passing `null` should return jobs that were not run in a job group:
val defaultJobGroupFuture = sc.parallelize(1 to 1000).countAsync()
val defaultJobGroupJobId = eventually(timeout(10 seconds)) {
val defaultJobGroupJobId = eventually(timeout(10.seconds)) {
defaultJobGroupFuture.jobIds.head
}
eventually(timeout(10 seconds)) {
eventually(timeout(10.seconds)) {
sc.statusTracker.getJobIdsForGroup(null).toSet should be (Set(defaultJobGroupJobId))
}
// Test jobs submitted in job groups:
sc.setJobGroup("my-job-group", "description")
sc.statusTracker.getJobIdsForGroup("my-job-group") should be (Seq.empty)
val firstJobFuture = sc.parallelize(1 to 1000).countAsync()
val firstJobId = eventually(timeout(10 seconds)) {
val firstJobId = eventually(timeout(10.seconds)) {
firstJobFuture.jobIds.head
}
eventually(timeout(10 seconds)) {
eventually(timeout(10.seconds)) {
sc.statusTracker.getJobIdsForGroup("my-job-group") should be (Seq(firstJobId))
}
val secondJobFuture = sc.parallelize(1 to 1000).countAsync()
val secondJobId = eventually(timeout(10 seconds)) {
val secondJobId = eventually(timeout(10.seconds)) {
secondJobFuture.jobIds.head
}
eventually(timeout(10 seconds)) {
eventually(timeout(10.seconds)) {
sc.statusTracker.getJobIdsForGroup("my-job-group").toSet should be (
Set(firstJobId, secondJobId))
}
@ -92,10 +91,10 @@ class StatusTrackerSuite extends SparkFunSuite with Matchers with LocalSparkCont
sc.setJobGroup("my-job-group2", "description")
sc.statusTracker.getJobIdsForGroup("my-job-group2") shouldBe empty
val firstJobFuture = sc.parallelize(1 to 1000, 1).takeAsync(1)
val firstJobId = eventually(timeout(10 seconds)) {
val firstJobId = eventually(timeout(10.seconds)) {
firstJobFuture.jobIds.head
}
eventually(timeout(10 seconds)) {
eventually(timeout(10.seconds)) {
sc.statusTracker.getJobIdsForGroup("my-job-group2") should be (Seq(firstJobId))
}
}
@ -105,10 +104,10 @@ class StatusTrackerSuite extends SparkFunSuite with Matchers with LocalSparkCont
sc.setJobGroup("my-job-group2", "description")
sc.statusTracker.getJobIdsForGroup("my-job-group2") shouldBe empty
val firstJobFuture = sc.parallelize(1 to 1000, 2).takeAsync(999)
val firstJobId = eventually(timeout(10 seconds)) {
eventually(timeout(10.seconds)) {
firstJobFuture.jobIds.head
}
eventually(timeout(10 seconds)) {
eventually(timeout(10.seconds)) {
sc.statusTracker.getJobIdsForGroup("my-job-group2") should have size 2
}
}

View file

@ -1277,7 +1277,7 @@ object SparkSubmitSuite extends SparkFunSuite with TimeLimits {
Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome))
try {
val exitCode = failAfter(60 seconds) { process.waitFor() }
val exitCode = failAfter(1.minute) { process.waitFor() }
if (exitCode != 0) {
fail(s"Process returned with exit code $exitCode. See the log4j logs for more detail.")
}

View file

@ -67,7 +67,7 @@ class StandaloneDynamicAllocationSuite
master = makeMaster()
workers = makeWorkers(10, 2048)
// Wait until all workers register with master successfully
eventually(timeout(60.seconds), interval(10.millis)) {
eventually(timeout(1.minute), interval(10.milliseconds)) {
assert(getMasterState.workers.size === numWorkers)
}
}

View file

@ -65,7 +65,7 @@ class AppClientSuite
master = makeMaster()
workers = makeWorkers(10, 2048)
// Wait until all workers register with master successfully
eventually(timeout(60.seconds), interval(10.millis)) {
eventually(timeout(1.minute), interval(10.milliseconds)) {
assert(getMasterState.workers.size === numWorkers)
}
}

View file

@ -25,7 +25,6 @@ import java.util.zip.{ZipInputStream, ZipOutputStream}
import scala.collection.JavaConverters._
import scala.concurrent.duration._
import scala.language.postfixOps
import com.google.common.io.{ByteStreams, Files}
import org.apache.commons.io.FileUtils
@ -142,7 +141,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
clock.getTimeMillis(), "test", false))
// Make sure the UI can be rendered.
list.foreach { case info =>
list.foreach { info =>
val appUi = provider.getAppUI(info.id, None)
appUi should not be null
appUi should not be None
@ -281,7 +280,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
list.last.attempts.size should be (3)
list.head.attempts.head.attemptId should be (Some("attempt1"))
list.foreach { case app =>
list.foreach { app =>
app.attempts.foreach { attempt =>
val appUi = provider.getAppUI(app.id, attempt.attemptId)
appUi should not be null
@ -734,7 +733,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
provider.inSafeMode = false
clock.setTime(10000)
eventually(timeout(1 second), interval(10 millis)) {
eventually(timeout(1.second), interval(10.milliseconds)) {
provider.getConfig().keys should not contain ("HDFS State")
}
} finally {
@ -747,12 +746,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
val clock = new ManualClock()
val provider = new SafeModeTestProvider(createTestConf(), clock)
val errorHandler = mock(classOf[Thread.UncaughtExceptionHandler])
val initThread = provider.startSafeModeCheckThread(Some(errorHandler))
provider.startSafeModeCheckThread(Some(errorHandler))
try {
provider.inSafeMode = false
clock.setTime(10000)
eventually(timeout(1 second), interval(10 millis)) {
eventually(timeout(1.second), interval(10.milliseconds)) {
verify(errorHandler).uncaughtException(any(), any())
}
} finally {

View file

@ -25,9 +25,7 @@ import javax.servlet.http.{HttpServletRequest, HttpServletRequestWrapper, HttpSe
import scala.collection.JavaConverters._
import scala.concurrent.duration._
import scala.language.postfixOps
import com.codahale.metrics.Counter
import com.google.common.io.{ByteStreams, Files}
import org.apache.commons.io.{FileUtils, IOUtils}
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
@ -457,16 +455,6 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
val port = server.boundPort
val metrics = server.cacheMetrics
// assert that a metric has a value; if not dump the whole metrics instance
def assertMetric(name: String, counter: Counter, expected: Long): Unit = {
val actual = counter.getCount
if (actual != expected) {
// this is here because Scalatest loses stack depth
fail(s"Wrong $name value - expected $expected but got $actual" +
s" in metrics\n$metrics")
}
}
// build a URL for an app or app/attempt plus a page underneath
def buildURL(appId: String, suffix: String): URL = {
new URL(s"http://localhost:$port/history/$appId$suffix")
@ -477,13 +465,11 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
new URL(s"http://localhost:$port/api/v1/applications/$appId$suffix")
}
val historyServerRoot = new URL(s"http://localhost:$port/")
// start initial job
val d = sc.parallelize(1 to 10)
d.count()
val stdInterval = interval(100 milliseconds)
val appId = eventually(timeout(20 seconds), stdInterval) {
val stdInterval = interval(100.milliseconds)
val appId = eventually(timeout(20.seconds), stdInterval) {
val json = getContentAndCode("applications", port)._2.get
val apps = parse(json).asInstanceOf[JArray].arr
apps should have size 1
@ -567,7 +553,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
d2.count()
dumpLogDir("After second job")
val stdTimeout = timeout(10 seconds)
val stdTimeout = timeout(10.seconds)
logDebug("waiting for UI to update")
eventually(stdTimeout, stdInterval) {
assert(2 === getNumJobs(""),

View file

@ -26,7 +26,6 @@ import scala.collection.mutable
import scala.collection.mutable.{HashMap, HashSet}
import scala.concurrent.duration._
import scala.io.Source
import scala.language.postfixOps
import scala.reflect.ClassTag
import org.json4s._
@ -216,7 +215,7 @@ class MasterSuite extends SparkFunSuite
master = makeMaster(conf)
master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
// Wait until Master recover from checkpoint data.
eventually(timeout(5 seconds), interval(100 milliseconds)) {
eventually(timeout(5.seconds), interval(100.milliseconds)) {
master.workers.size should be(1)
}
@ -234,7 +233,7 @@ class MasterSuite extends SparkFunSuite
fakeWorkerInfo.coresUsed should be(0)
master.self.send(MasterChangeAcknowledged(fakeAppInfo.id))
eventually(timeout(1 second), interval(10 milliseconds)) {
eventually(timeout(1.second), interval(10.milliseconds)) {
// Application state should be WAITING when "MasterChangeAcknowledged" event executed.
fakeAppInfo.state should be(ApplicationState.WAITING)
}
@ -242,7 +241,7 @@ class MasterSuite extends SparkFunSuite
master.self.send(
WorkerSchedulerStateResponse(fakeWorkerInfo.id, fakeExecutors, Seq(fakeDriverInfo.id)))
eventually(timeout(5 seconds), interval(100 milliseconds)) {
eventually(timeout(5.seconds), interval(100.milliseconds)) {
getState(master) should be(RecoveryState.ALIVE)
}
@ -267,7 +266,7 @@ class MasterSuite extends SparkFunSuite
val localCluster = new LocalSparkCluster(2, 2, 512, conf)
localCluster.start()
try {
eventually(timeout(5 seconds), interval(100 milliseconds)) {
eventually(timeout(5.seconds), interval(100.milliseconds)) {
val json = Source.fromURL(s"http://localhost:${localCluster.masterWebUIPort}/json")
.getLines().mkString("\n")
val JArray(workers) = (parse(json) \ "workers")
@ -293,7 +292,7 @@ class MasterSuite extends SparkFunSuite
val localCluster = new LocalSparkCluster(2, 2, 512, conf)
localCluster.start()
try {
eventually(timeout(5 seconds), interval(100 milliseconds)) {
eventually(timeout(5.seconds), interval(100.milliseconds)) {
val json = Source.fromURL(s"http://localhost:${localCluster.masterWebUIPort}/json")
.getLines().mkString("\n")
val JArray(workers) = (parse(json) \ "workers")

View file

@ -228,7 +228,7 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
testCleanupFilesWithConfig(false)
}
private def testCleanupFilesWithConfig(value: Boolean) = {
private def testCleanupFilesWithConfig(value: Boolean): Unit = {
val conf = new SparkConf().set(config.STORAGE_CLEANUP_FILES_AFTER_EXECUTOR_EXIT, value)
val cleanupCalled = new AtomicBoolean(false)
@ -257,7 +257,7 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
testWorkDirCleanupAndRemoveMetadataWithConfig(false)
}
private def testWorkDirCleanupAndRemoveMetadataWithConfig(dbCleanupEnabled: Boolean) = {
private def testWorkDirCleanupAndRemoveMetadataWithConfig(dbCleanupEnabled: Boolean): Unit = {
val conf = new SparkConf().set("spark.shuffle.service.db.enabled", dbCleanupEnabled.toString)
conf.set("spark.worker.cleanup.appDataTtl", "60")
conf.set("spark.shuffle.service.enabled", "true")
@ -282,7 +282,7 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
}
executorDir.setLastModified(System.currentTimeMillis - (1000 * 120))
worker.receive(WorkDirCleanup)
eventually(timeout(1000.milliseconds), interval(10.milliseconds)) {
eventually(timeout(1.second), interval(10.milliseconds)) {
assert(!executorDir.exists())
assert(cleanupCalled.get() == dbCleanupEnabled)
}

View file

@ -27,7 +27,6 @@ import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.Map
import scala.concurrent.duration._
import scala.language.postfixOps
import org.mockito.ArgumentCaptor
import org.mockito.ArgumentMatchers.{any, eq => meq}

View file

@ -20,7 +20,6 @@ package org.apache.spark.launcher
import java.util.concurrent.TimeUnit
import scala.concurrent.duration._
import scala.language.postfixOps
import org.scalatest.Matchers
import org.scalatest.concurrent.Eventually._
@ -57,13 +56,13 @@ class LauncherBackendSuite extends SparkFunSuite with Matchers {
.startApplication()
try {
eventually(timeout(30 seconds), interval(100 millis)) {
eventually(timeout(30.seconds), interval(100.milliseconds)) {
handle.getAppId() should not be (null)
}
handle.stop()
eventually(timeout(30 seconds), interval(100 millis)) {
eventually(timeout(30.seconds), interval(100.milliseconds)) {
handle.getState() should be (SparkAppHandle.State.KILLED)
}
} finally {

View file

@ -142,7 +142,7 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim
}
assert(f.get() === 10)
failAfter(10 seconds) {
failAfter(10.seconds) {
sem.acquire(2)
}
}
@ -178,7 +178,7 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim
f.get()
}
failAfter(10 seconds) {
failAfter(10.seconds) {
sem.acquire(2)
}
}

View file

@ -18,7 +18,6 @@
package org.apache.spark.rdd
import scala.concurrent.duration._
import scala.language.postfixOps
import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}
@ -174,7 +173,7 @@ class LocalCheckpointSuite extends SparkFunSuite with LocalSparkContext {
val blockId = RDDBlockId(rdd.id, numPartitions - 1)
bmm.removeBlock(blockId)
// Wait until the block has been removed successfully.
eventually(timeout(1 seconds), interval(100 milliseconds)) {
eventually(timeout(1.second), interval(100.milliseconds)) {
assert(bmm.getBlockStatus(blockId).isEmpty)
}
try {

View file

@ -26,7 +26,6 @@ import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.language.postfixOps
import com.google.common.io.Files
import org.mockito.ArgumentMatchers.any
@ -80,7 +79,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
}
})
rpcEndpointRef.send("hello")
eventually(timeout(5 seconds), interval(10 millis)) {
eventually(timeout(5.seconds), interval(10.milliseconds)) {
assert("hello" === message)
}
}
@ -101,7 +100,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
val rpcEndpointRef = anotherEnv.setupEndpointRef(env.address, "send-remotely")
try {
rpcEndpointRef.send("hello")
eventually(timeout(5 seconds), interval(10 millis)) {
eventually(timeout(5.seconds), interval(10.milliseconds)) {
assert("hello" === message)
}
} finally {
@ -180,7 +179,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
val rpcEndpointRef = anotherEnv.setupEndpointRef(env.address, "ask-timeout")
try {
val e = intercept[RpcTimeoutException] {
rpcEndpointRef.askSync[String]("hello", new RpcTimeout(1 millis, shortProp))
rpcEndpointRef.askSync[String]("hello", new RpcTimeout(1.millisecond, shortProp))
}
// The SparkException cause should be a RpcTimeoutException with message indicating the
// controlling timeout property
@ -236,7 +235,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
}
})
eventually(timeout(5 seconds), interval(10 millis)) {
eventually(timeout(5.seconds), interval(10.milliseconds)) {
assert(e.getMessage === "Oops!")
}
}
@ -261,7 +260,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
env.stop(endpointRef)
eventually(timeout(5 seconds), interval(10 millis)) {
eventually(timeout(5.seconds), interval(10.milliseconds)) {
assert(e.getMessage === "Oops!")
}
}
@ -282,7 +281,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
endpointRef.send("Foo")
eventually(timeout(5 seconds), interval(10 millis)) {
eventually(timeout(5.seconds), interval(10.milliseconds)) {
assert(e.getMessage === "Oops!")
}
}
@ -303,7 +302,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
}
})
eventually(timeout(5 seconds), interval(10 millis)) {
eventually(timeout(5.seconds), interval(10.milliseconds)) {
// Calling `self` in `onStart` is fine
assert(callSelfSuccessfully)
}
@ -324,7 +323,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
endpointRef.send("Foo")
eventually(timeout(5 seconds), interval(10 millis)) {
eventually(timeout(5.seconds), interval(10.milliseconds)) {
// Calling `self` in `receive` is fine
assert(callSelfSuccessfully)
}
@ -347,9 +346,9 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
env.stop(endpointRef)
eventually(timeout(5 seconds), interval(10 millis)) {
eventually(timeout(5.seconds), interval(10.milliseconds)) {
// Calling `self` in `onStop` will return null, so selfOption will be None
assert(selfOption == None)
assert(selfOption.isEmpty)
}
}
@ -376,7 +375,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
}.start()
}
eventually(timeout(5 seconds), interval(5 millis)) {
eventually(timeout(5.seconds), interval(5.milliseconds)) {
assert(result == 1000)
}
@ -401,7 +400,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
env.stop(endpointRef)
env.stop(endpointRef)
eventually(timeout(5 seconds), interval(5 millis)) {
eventually(timeout(5.seconds), interval(5.milliseconds)) {
// Calling stop twice should only trigger onStop once.
assert(onStopCount == 1)
}
@ -417,7 +416,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
})
val f = endpointRef.ask[String]("Hi")
val ack = ThreadUtils.awaitResult(f, 5 seconds)
val ack = ThreadUtils.awaitResult(f, 5.seconds)
assert("ack" === ack)
env.stop(endpointRef)
@ -437,7 +436,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
val rpcEndpointRef = anotherEnv.setupEndpointRef(env.address, "sendWithReply-remotely")
try {
val f = rpcEndpointRef.ask[String]("hello")
val ack = ThreadUtils.awaitResult(f, 5 seconds)
val ack = ThreadUtils.awaitResult(f, 5.seconds)
assert("ack" === ack)
} finally {
anotherEnv.shutdown()
@ -456,7 +455,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
val f = endpointRef.ask[String]("Hi")
val e = intercept[SparkException] {
ThreadUtils.awaitResult(f, 5 seconds)
ThreadUtils.awaitResult(f, 5.seconds)
}
assert("Oops" === e.getCause.getMessage)
@ -478,7 +477,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
try {
val f = rpcEndpointRef.ask[String]("hello")
val e = intercept[SparkException] {
ThreadUtils.awaitResult(f, 5 seconds)
ThreadUtils.awaitResult(f, 5.seconds)
}
assert("Oops" === e.getCause.getMessage)
} finally {
@ -530,14 +529,14 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
// Send a message to set up the connection
serverRefInServer2.send("hello")
eventually(timeout(5 seconds), interval(5 millis)) {
eventually(timeout(5.seconds), interval(5.milliseconds)) {
assert(events.contains(("onConnected", serverEnv2.address)))
}
serverEnv2.shutdown()
serverEnv2.awaitTermination()
eventually(timeout(5 seconds), interval(5 millis)) {
eventually(timeout(5.seconds), interval(5.milliseconds)) {
assert(events.contains(("onConnected", serverEnv2.address)))
assert(events.contains(("onDisconnected", serverEnv2.address)))
}
@ -558,7 +557,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
// Send a message to set up the connection
serverRefInClient.send("hello")
eventually(timeout(5 seconds), interval(5 millis)) {
eventually(timeout(5.seconds), interval(5.milliseconds)) {
// We don't know the exact client address but at least we can verify the message type
assert(events.asScala.map(_._1).exists(_ == "onConnected"))
}
@ -566,7 +565,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
clientEnv.shutdown()
clientEnv.awaitTermination()
eventually(timeout(5 seconds), interval(5 millis)) {
eventually(timeout(5.seconds), interval(5.milliseconds)) {
// We don't know the exact client address but at least we can verify the message type
assert(events.asScala.map(_._1).exists(_ == "onConnected"))
assert(events.asScala.map(_._1).exists(_ == "onDisconnected"))
@ -589,14 +588,14 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
// Send a message to set up the connection
serverRefInClient.send("hello")
eventually(timeout(5 seconds), interval(5 millis)) {
eventually(timeout(5.seconds), interval(5.milliseconds)) {
assert(events.contains(("onConnected", serverEnv.address)))
}
serverEnv.shutdown()
serverEnv.awaitTermination()
eventually(timeout(5 seconds), interval(5 millis)) {
eventually(timeout(5.seconds), interval(5.milliseconds)) {
assert(events.contains(("onConnected", serverEnv.address)))
assert(events.contains(("onDisconnected", serverEnv.address)))
}
@ -624,7 +623,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
try {
val f = rpcEndpointRef.ask[String]("hello")
val e = intercept[SparkException] {
ThreadUtils.awaitResult(f, 1 seconds)
ThreadUtils.awaitResult(f, 1.second)
}
assert(e.getCause.isInstanceOf[NotSerializableException])
} finally {
@ -658,7 +657,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
})
val rpcEndpointRef = remoteEnv.setupEndpointRef(localEnv.address, "send-authentication")
rpcEndpointRef.send("hello")
eventually(timeout(5 seconds), interval(10 millis)) {
eventually(timeout(5.seconds), interval(10.milliseconds)) {
assert("hello" === message)
}
} finally {
@ -778,8 +777,8 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
}
})
val longTimeout = new RpcTimeout(1 second, "spark.rpc.long.timeout")
val shortTimeout = new RpcTimeout(10 millis, "spark.rpc.short.timeout")
val longTimeout = new RpcTimeout(1.second, "spark.rpc.long.timeout")
val shortTimeout = new RpcTimeout(10.milliseconds, "spark.rpc.short.timeout")
// Ask with immediate response, should complete successfully
val fut1 = rpcEndpointRef.ask[String]("hello", longTimeout)
@ -804,7 +803,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
// once the future is complete to verify addMessageIfTimeout was invoked
val reply3 =
intercept[RpcTimeoutException] {
Await.result(fut3, 2000 millis)
Await.result(fut3, 2.seconds)
}.getMessage
// scalastyle:on awaitresult

View file

@ -36,7 +36,7 @@ import org.apache.spark.util.{RpcUtils, SerializableBuffer}
class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext
with Eventually {
private val executorUpTimeout = 60.seconds
private val executorUpTimeout = 1.minute
test("serialized task larger than max RPC message size") {
val conf = new SparkConf

View file

@ -594,7 +594,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
var rdd: RDD[_] = new MyRDD(sc, 1, Nil)
(1 to 30).foreach(_ => rdd = rdd.zip(rdd))
// getPreferredLocs runs quickly, indicating that exponential graph traversal is avoided.
failAfter(10 seconds) {
failAfter(10.seconds) {
val preferredLocs = scheduler.getPreferredLocs(rdd, 0)
// No preferred locations are returned.
assert(preferredLocs.length === 0)

View file

@ -22,7 +22,6 @@ import java.util.Date
import java.util.concurrent.TimeoutException
import scala.concurrent.duration._
import scala.language.postfixOps
import org.apache.hadoop.mapred._
import org.apache.hadoop.mapreduce.TaskType
@ -159,7 +158,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
// It's an error if the job completes successfully even though no committer was authorized,
// so throw an exception if the job was allowed to complete.
intercept[TimeoutException] {
ThreadUtils.awaitResult(futureAction, 5 seconds)
ThreadUtils.awaitResult(futureAction, 5.seconds)
}
assert(tempDir.list().size === 0)
}

View file

@ -170,7 +170,7 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa
// and notifies the job waiter before our original thread in the task scheduler finishes
// handling the event and marks the taskset as complete. So its ok if we need to wait a
// *little* bit longer for the original taskscheduler thread to finish up to deal w/ the race.
eventually(timeout(1 second), interval(10 millis)) {
eventually(timeout(1.second), interval(10.milliseconds)) {
assert(taskScheduler.runningTaskSets.isEmpty)
}
assert(!backend.hasTasks)

View file

@ -23,7 +23,6 @@ import java.nio.ByteBuffer
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.control.NonFatal
import com.google.common.util.concurrent.MoreExecutors
@ -58,18 +57,18 @@ private class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: Task
// Only remove the result once, since we'd like to test the case where the task eventually
// succeeds.
serializer.get().deserialize[TaskResult[_]](serializedData) match {
case IndirectTaskResult(blockId, size) =>
case IndirectTaskResult(blockId, _) =>
sparkEnv.blockManager.master.removeBlock(blockId)
// removeBlock is asynchronous. Need to wait it's removed successfully
try {
eventually(timeout(3 seconds), interval(200 milliseconds)) {
eventually(timeout(3.seconds), interval(200.milliseconds)) {
assert(!sparkEnv.blockManager.master.contains(blockId))
}
removeBlockSuccessfully = true
} catch {
case NonFatal(e) => removeBlockSuccessfully = false
}
case directResult: DirectTaskResult[_] =>
case _: DirectTaskResult[_] =>
taskSetManager.abort("Internal error: expect only indirect results")
}
serializedData.rewind()

View file

@ -22,7 +22,6 @@ import java.util.Locale
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
import scala.language.implicitConversions
import scala.language.postfixOps
import org.mockito.Mockito.{mock, when}
import org.scalatest.{BeforeAndAfter, Matchers}
@ -247,7 +246,7 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
// Add another normal block manager and test that 2x replication works
makeBlockManager(10000, "anotherStore")
eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
eventually(timeout(1.second), interval(10.milliseconds)) {
assert(replicateAndGetNumCopies("a2") === 2)
}
}
@ -272,14 +271,14 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
// Add another store, 3x replication should work now, 4x replication should only replicate 3x
val newStore1 = makeBlockManager(storeSize, s"newstore1")
eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
eventually(timeout(1.second), interval(10.milliseconds)) {
assert(replicateAndGetNumCopies("a3", 3) === 3)
}
assert(replicateAndGetNumCopies("a4", 4) === 3)
// Add another store, 4x replication should work now
val newStore2 = makeBlockManager(storeSize, s"newstore2")
eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
eventually(timeout(1.second), interval(10.milliseconds)) {
assert(replicateAndGetNumCopies("a5", 4) === 4)
}
@ -295,7 +294,7 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
val newStores = (3 to 5).map {
i => makeBlockManager(storeSize, s"newstore$i")
}
eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
eventually(timeout(1.second), interval(10.milliseconds)) {
assert(replicateAndGetNumCopies("a7", 3) === 3)
}
}
@ -454,13 +453,13 @@ class BlockManagerProactiveReplicationSuite extends BlockManagerReplicationBehav
master.removeExecutor(bm.blockManagerId.executorId)
bm.stop()
// giving enough time for replication to happen and new block be reported to master
eventually(timeout(5 seconds), interval(100 millis)) {
eventually(timeout(5.seconds), interval(100.milliseconds)) {
val newLocations = master.getLocations(blockId).toSet
assert(newLocations.size === replicationFactor)
}
}
val newLocations = eventually(timeout(5 seconds), interval(100 millis)) {
val newLocations = eventually(timeout(5.seconds), interval(100.milliseconds)) {
val _newLocations = master.getLocations(blockId).toSet
assert(_newLocations.size === replicationFactor)
_newLocations
@ -472,7 +471,7 @@ class BlockManagerProactiveReplicationSuite extends BlockManagerReplicationBehav
"New locations contain stopped block managers.")
// Make sure all locks have been released.
eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
eventually(timeout(1.second), interval(10.milliseconds)) {
initialStores.filter(bm => newLocations.contains(bm.blockManagerId)).foreach { bm =>
assert(bm.blockInfoManager.getTaskLockCount(BlockInfo.NON_TASK_WRITER) === 0)
}

View file

@ -23,7 +23,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.language.{implicitConversions, postfixOps}
import scala.language.implicitConversions
import scala.reflect.ClassTag
import org.apache.commons.lang3.RandomUtils
@ -275,19 +275,19 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
master.removeBlock("a2-to-remove")
master.removeBlock("a3-to-remove")
eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
eventually(timeout(1.second), interval(10.milliseconds)) {
assert(!store.hasLocalBlock("a1-to-remove"))
master.getLocations("a1-to-remove") should have size 0
}
eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
eventually(timeout(1.second), interval(10.milliseconds)) {
assert(!store.hasLocalBlock("a2-to-remove"))
master.getLocations("a2-to-remove") should have size 0
}
eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
eventually(timeout(1.second), interval(10.milliseconds)) {
assert(store.hasLocalBlock("a3-to-remove"))
master.getLocations("a3-to-remove") should have size 0
}
eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
eventually(timeout(1.second), interval(10.milliseconds)) {
val memStatus = master.getMemoryStatus.head._2
memStatus._1 should equal (40000L)
memStatus._2 should equal (40000L)
@ -305,15 +305,15 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
store.putSingle("nonrddblock", a3, StorageLevel.MEMORY_ONLY)
master.removeRdd(0, blocking = false)
eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
eventually(timeout(1.second), interval(10.milliseconds)) {
store.getSingleAndReleaseLock(rdd(0, 0)) should be (None)
master.getLocations(rdd(0, 0)) should have size 0
}
eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
eventually(timeout(1.second), interval(10.milliseconds)) {
store.getSingleAndReleaseLock(rdd(0, 1)) should be (None)
master.getLocations(rdd(0, 1)) should have size 0
}
eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
eventually(timeout(1.second), interval(10.milliseconds)) {
store.getSingleAndReleaseLock("nonrddblock") should not be (None)
master.getLocations("nonrddblock") should have size (1)
}
@ -378,7 +378,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// remove broadcast 1 block from both the stores asynchronously
// and verify all broadcast 1 blocks have been removed
master.removeBroadcast(1, removeFromMaster = true, blocking = false)
eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
eventually(timeout(1.second), interval(10.milliseconds)) {
assert(!driverStore.hasLocalBlock(broadcast1BlockId))
assert(!executorStore.hasLocalBlock(broadcast1BlockId))
}
@ -386,7 +386,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// remove broadcast 2 from both the stores asynchronously
// and verify all broadcast 2 blocks have been removed
master.removeBroadcast(2, removeFromMaster = true, blocking = false)
eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
eventually(timeout(1.second), interval(10.milliseconds)) {
assert(!driverStore.hasLocalBlock(broadcast2BlockId))
assert(!driverStore.hasLocalBlock(broadcast2BlockId2))
assert(!executorStore.hasLocalBlock(broadcast2BlockId))
@ -905,7 +905,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
}
// Make sure get a1 doesn't hang and returns None.
failAfter(1 second) {
failAfter(1.second) {
assert(store.getSingleAndReleaseLock("a1").isEmpty, "a1 should not be in store")
}
}

View file

@ -123,12 +123,12 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
val ui = sc.ui.get
val rdd = sc.parallelize(Seq(1, 2, 3))
rdd.persist(StorageLevels.DISK_ONLY).count()
eventually(timeout(5 seconds), interval(50 milliseconds)) {
eventually(timeout(5.seconds), interval(50.milliseconds)) {
goToUi(ui, "/storage")
val tableRowText = findAll(cssSelector("#storage-by-rdd-table td")).map(_.text).toSeq
tableRowText should contain (StorageLevels.DISK_ONLY.description)
}
eventually(timeout(5 seconds), interval(50 milliseconds)) {
eventually(timeout(5.seconds), interval(50.milliseconds)) {
goToUi(ui, "/storage/rdd/?id=0")
val tableRowText = findAll(cssSelector("#rdd-storage-by-block-table td")).map(_.text).toSeq
tableRowText should contain (StorageLevels.DISK_ONLY.description)
@ -143,12 +143,12 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
rdd.unpersist(blocking = true)
rdd.persist(StorageLevels.MEMORY_ONLY).count()
eventually(timeout(5 seconds), interval(50 milliseconds)) {
eventually(timeout(5.seconds), interval(50.milliseconds)) {
goToUi(ui, "/storage")
val tableRowText = findAll(cssSelector("#storage-by-rdd-table td")).map(_.text).toSeq
tableRowText should contain (StorageLevels.MEMORY_ONLY.description)
}
eventually(timeout(5 seconds), interval(50 milliseconds)) {
eventually(timeout(5.seconds), interval(50.milliseconds)) {
goToUi(ui, "/storage/rdd/?id=0")
val tableRowText = findAll(cssSelector("#rdd-storage-by-block-table td")).map(_.text).toSeq
tableRowText should contain (StorageLevels.MEMORY_ONLY.description)
@ -203,7 +203,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
intercept[SparkException] {
sc.parallelize(1 to 10).map { x => throw new Exception()}.collect()
}
eventually(timeout(5 seconds), interval(50 milliseconds)) {
eventually(timeout(5.seconds), interval(50.milliseconds)) {
goToUi(sc, "/stages")
find(id("active")) should be(None) // Since we hide empty tables
find(id("failed")).get.text should be("Failed Stages (1)")
@ -218,7 +218,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
intercept[SparkException] {
sc.parallelize(1 to 10).map { x => unserializableObject}.collect()
}
eventually(timeout(5 seconds), interval(50 milliseconds)) {
eventually(timeout(5.seconds), interval(50.milliseconds)) {
goToUi(sc, "/stages")
find(id("active")) should be(None) // Since we hide empty tables
// The failure occurs before the stage becomes active, hence we should still show only one
@ -239,7 +239,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
withSpark(newSparkContext(killEnabled = true)) { sc =>
runSlowJob(sc)
eventually(timeout(5 seconds), interval(50 milliseconds)) {
eventually(timeout(5.seconds), interval(50.milliseconds)) {
goToUi(sc, "/jobs")
assert(hasKillLink)
}
@ -247,7 +247,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
withSpark(newSparkContext(killEnabled = false)) { sc =>
runSlowJob(sc)
eventually(timeout(5 seconds), interval(50 milliseconds)) {
eventually(timeout(5.seconds), interval(50.milliseconds)) {
goToUi(sc, "/jobs")
assert(!hasKillLink)
}
@ -255,7 +255,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
withSpark(newSparkContext(killEnabled = true)) { sc =>
runSlowJob(sc)
eventually(timeout(5 seconds), interval(50 milliseconds)) {
eventually(timeout(5.seconds), interval(50.milliseconds)) {
goToUi(sc, "/stages")
assert(hasKillLink)
}
@ -263,7 +263,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
withSpark(newSparkContext(killEnabled = false)) { sc =>
runSlowJob(sc)
eventually(timeout(5 seconds), interval(50 milliseconds)) {
eventually(timeout(5.seconds), interval(50.milliseconds)) {
goToUi(sc, "/stages")
assert(!hasKillLink)
}
@ -274,7 +274,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
withSpark(newSparkContext()) { sc =>
// If no job has been run in a job group, then "(Job Group)" should not appear in the header
sc.parallelize(Seq(1, 2, 3)).count()
eventually(timeout(5 seconds), interval(50 milliseconds)) {
eventually(timeout(5.seconds), interval(50.milliseconds)) {
goToUi(sc, "/jobs")
val tableHeaders = findAll(cssSelector("th")).map(_.text).toSeq
tableHeaders(0) should not startWith "Job Id (Job Group)"
@ -282,7 +282,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
// Once at least one job has been run in a job group, then we should display the group name:
sc.setJobGroup("my-job-group", "my-job-group-description")
sc.parallelize(Seq(1, 2, 3)).count()
eventually(timeout(5 seconds), interval(50 milliseconds)) {
eventually(timeout(5.seconds), interval(50.milliseconds)) {
goToUi(sc, "/jobs")
val tableHeaders = findAll(cssSelector("th")).map(_.text).toSeq
// Can suffix up/down arrow in the header
@ -325,7 +325,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
}
}
mappedData.count()
eventually(timeout(5 seconds), interval(50 milliseconds)) {
eventually(timeout(5.seconds), interval(50.milliseconds)) {
goToUi(sc, "/jobs")
find(cssSelector(".stage-progress-cell")).get.text should be ("2/2 (1 failed)")
find(cssSelector(".progress-cell .progress")).get.text should be ("2/2 (1 failed)")
@ -373,7 +373,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
}.groupBy(identity).map(identity).groupBy(identity).map(identity)
// Start the job:
rdd.countAsync()
eventually(timeout(10 seconds), interval(50 milliseconds)) {
eventually(timeout(10.seconds), interval(50.milliseconds)) {
goToUi(sc, "/jobs/job/?id=0")
find(id("active")).get.text should be ("Active Stages (1)")
find(id("pending")).get.text should be ("Pending Stages (2)")
@ -399,7 +399,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
// mentioned in its job start event but which were never actually executed:
rdd.count()
rdd.count()
eventually(timeout(10 seconds), interval(50 milliseconds)) {
eventually(timeout(10.seconds), interval(50.milliseconds)) {
goToUi(sc, "/jobs")
// The completed jobs table should have two rows. The first row will be the most recent job:
val firstRow = find(cssSelector("tbody tr")).get.underlying
@ -426,7 +426,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
// mentioned in its job start event but which were never actually executed:
rdd.count()
rdd.count()
eventually(timeout(10 seconds), interval(50 milliseconds)) {
eventually(timeout(10.seconds), interval(50.milliseconds)) {
goToUi(sc, "/jobs/job/?id=1")
find(id("pending")) should be (None)
find(id("active")) should be (None)
@ -454,7 +454,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
// mentioned in its job start event but which were never actually executed:
rdd.count()
rdd.count()
eventually(timeout(10 seconds), interval(50 milliseconds)) {
eventually(timeout(10.seconds), interval(50.milliseconds)) {
goToUi(sc, "/jobs")
findAll(cssSelector("tbody tr a")).foreach { link =>
link.text.toLowerCase(Locale.ROOT) should include ("count")
@ -476,7 +476,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
})
}
sparkUI.attachTab(newTab)
eventually(timeout(10 seconds), interval(50 milliseconds)) {
eventually(timeout(10.seconds), interval(50.milliseconds)) {
goToUi(sc, "")
find(cssSelector("""ul li a[href*="jobs"]""")) should not be(None)
find(cssSelector("""ul li a[href*="stages"]""")) should not be(None)
@ -484,13 +484,13 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
find(cssSelector("""ul li a[href*="environment"]""")) should not be(None)
find(cssSelector("""ul li a[href*="foo"]""")) should not be(None)
}
eventually(timeout(10 seconds), interval(50 milliseconds)) {
eventually(timeout(10.seconds), interval(50.milliseconds)) {
// check whether new page exists
goToUi(sc, "/foo")
find(cssSelector("b")).get.text should include ("html magic")
}
sparkUI.detachTab(newTab)
eventually(timeout(10 seconds), interval(50 milliseconds)) {
eventually(timeout(10.seconds), interval(50.milliseconds)) {
goToUi(sc, "")
find(cssSelector("""ul li a[href*="jobs"]""")) should not be(None)
find(cssSelector("""ul li a[href*="stages"]""")) should not be(None)
@ -498,7 +498,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
find(cssSelector("""ul li a[href*="environment"]""")) should not be(None)
find(cssSelector("""ul li a[href*="foo"]""")) should be(None)
}
eventually(timeout(10 seconds), interval(50 milliseconds)) {
eventually(timeout(10.seconds), interval(50.milliseconds)) {
// check new page not exist
goToUi(sc, "/foo")
find(cssSelector("b")) should be(None)
@ -509,7 +509,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
test("kill stage POST/GET response is correct") {
withSpark(newSparkContext(killEnabled = true)) { sc =>
sc.parallelize(1 to 10).map{x => Thread.sleep(10000); x}.countAsync()
eventually(timeout(5 seconds), interval(50 milliseconds)) {
eventually(timeout(5.seconds), interval(50.milliseconds)) {
val url = new URL(
sc.ui.get.webUrl.stripSuffix("/") + "/stages/stage/kill/?id=0")
// SPARK-6846: should be POST only but YARN AM doesn't proxy POST
@ -522,7 +522,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
test("kill job POST/GET response is correct") {
withSpark(newSparkContext(killEnabled = true)) { sc =>
sc.parallelize(1 to 10).map{x => Thread.sleep(10000); x}.countAsync()
eventually(timeout(5 seconds), interval(50 milliseconds)) {
eventually(timeout(5.seconds), interval(50.milliseconds)) {
val url = new URL(
sc.ui.get.webUrl.stripSuffix("/") + "/jobs/job/kill/?id=0")
// SPARK-6846: should be POST only but YARN AM doesn't proxy POST
@ -560,7 +560,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
("8", "count")
)
eventually(timeout(1 second), interval(50 milliseconds)) {
eventually(timeout(1.second), interval(50.milliseconds)) {
goToUi(sc, "/jobs")
// The completed jobs table should have two rows. The first row will be the most recent job:
find("completed-summary").get.text should be ("Completed Jobs: 10, only showing 2")
@ -606,7 +606,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
("17", "groupBy")
)
eventually(timeout(1 second), interval(50 milliseconds)) {
eventually(timeout(1.second), interval(50.milliseconds)) {
goToUi(sc, "/stages")
find("completed-summary").get.text should be ("Completed Stages: 20, only showing 3")
find("completed").get.text should be ("Completed Stages (20, only showing 3)")
@ -679,7 +679,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
sc.parallelize(Seq(1, 2, 3)).map(identity).groupBy(identity).map(identity).groupBy(identity)
rdd.count()
eventually(timeout(5 seconds), interval(100 milliseconds)) {
eventually(timeout(5.seconds), interval(100.milliseconds)) {
val stage0 = Source.fromURL(sc.ui.get.webUrl +
"/stages/stage/?id=0&attempt=0&expandDagViz=true").mkString
assert(stage0.contains("digraph G {\n subgraph clusterstage_0 {\n " +
@ -718,7 +718,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
rdd.count()
rdd.count()
eventually(timeout(5 seconds), interval(50 milliseconds)) {
eventually(timeout(5.seconds), interval(50.milliseconds)) {
goToUi(sc, "/stages")
find(id("skipped")).get.text should be("Skipped Stages (1)")
}

View file

@ -75,7 +75,7 @@ class UISuite extends SparkFunSuite {
ignore("basic ui visibility") {
withSpark(newSparkContext()) { sc =>
// test if the ui is visible, and all the expected tabs are visible
eventually(timeout(10 seconds), interval(50 milliseconds)) {
eventually(timeout(10.seconds), interval(50.milliseconds)) {
val html = Source.fromURL(sc.ui.get.webUrl).mkString
assert(!html.contains("random data that should not be present"))
assert(html.toLowerCase(Locale.ROOT).contains("stages"))
@ -89,7 +89,7 @@ class UISuite extends SparkFunSuite {
ignore("visibility at localhost:4040") {
withSpark(newSparkContext()) { sc =>
// test if visible from http://localhost:4040
eventually(timeout(10 seconds), interval(50 milliseconds)) {
eventually(timeout(10.seconds), interval(50.milliseconds)) {
val html = Source.fromURL("http://localhost:4040").mkString
assert(html.toLowerCase(Locale.ROOT).contains("stages"))
}

View file

@ -21,7 +21,6 @@ import java.util.concurrent.{ConcurrentLinkedQueue, CountDownLatch}
import scala.collection.JavaConverters._
import scala.concurrent.duration._
import scala.language.postfixOps
import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
import org.scalatest.concurrent.Eventually._
@ -45,7 +44,7 @@ class EventLoopSuite extends SparkFunSuite with TimeLimits {
}
eventLoop.start()
(1 to 100).foreach(eventLoop.post)
eventually(timeout(5 seconds), interval(5 millis)) {
eventually(timeout(5.seconds), interval(5.milliseconds)) {
assert((1 to 100) === buffer.asScala.toSeq)
}
eventLoop.stop()
@ -80,7 +79,7 @@ class EventLoopSuite extends SparkFunSuite with TimeLimits {
}
eventLoop.start()
eventLoop.post(1)
eventually(timeout(5 seconds), interval(5 millis)) {
eventually(timeout(5.seconds), interval(5.milliseconds)) {
assert(e === receivedError)
}
eventLoop.stop()
@ -102,7 +101,7 @@ class EventLoopSuite extends SparkFunSuite with TimeLimits {
}
eventLoop.start()
eventLoop.post(1)
eventually(timeout(5 seconds), interval(5 millis)) {
eventually(timeout(5.seconds), interval(5.milliseconds)) {
assert(e === receivedError)
assert(eventLoop.isActive)
}
@ -157,7 +156,7 @@ class EventLoopSuite extends SparkFunSuite with TimeLimits {
}.start()
}
eventually(timeout(5 seconds), interval(5 millis)) {
eventually(timeout(5.seconds), interval(5.milliseconds)) {
assert(threadNum * eventsFromEachThread === receivedEventsCount)
}
eventLoop.stop()
@ -182,7 +181,7 @@ class EventLoopSuite extends SparkFunSuite with TimeLimits {
}
eventLoop.start()
eventLoop.post(1)
failAfter(5 seconds) {
failAfter(5.seconds) {
// Wait until we enter `onReceive`
onReceiveLatch.await()
eventLoop.stop()
@ -203,7 +202,7 @@ class EventLoopSuite extends SparkFunSuite with TimeLimits {
}
eventLoop.start()
eventLoop.post(1)
eventually(timeout(5 seconds), interval(5 millis)) {
eventually(timeout(5.seconds), interval(5.milliseconds)) {
assert(!eventLoop.isActive)
}
}
@ -227,7 +226,7 @@ class EventLoopSuite extends SparkFunSuite with TimeLimits {
}
}
eventLoop.start()
eventually(timeout(5 seconds), interval(5 millis)) {
eventually(timeout(5.seconds), interval(5.milliseconds)) {
assert(!eventLoop.isActive)
}
assert(onStopCalled)
@ -250,7 +249,7 @@ class EventLoopSuite extends SparkFunSuite with TimeLimits {
}
eventLoop.start()
eventLoop.post(1)
eventually(timeout(5 seconds), interval(5 millis)) {
eventually(timeout(5.seconds), interval(5.milliseconds)) {
assert(!eventLoop.isActive)
}
assert(onStopCalled)
@ -274,7 +273,7 @@ class EventLoopSuite extends SparkFunSuite with TimeLimits {
}
eventLoop.start()
eventLoop.post(1)
eventually(timeout(5 seconds), interval(5 millis)) {
eventually(timeout(5.seconds), interval(5.milliseconds)) {
assert(!eventLoop.isActive)
}
assert(onStopCalled)

View file

@ -19,7 +19,6 @@ package org.apache.spark.util.collection
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.ref.WeakReference
import org.scalatest.Matchers
@ -460,7 +459,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite
// https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala
// (lines 69-89)
// assert(map.currentMap == null)
eventually(timeout(5 seconds), interval(200 milliseconds)) {
eventually(timeout(5.seconds), interval(200.milliseconds)) {
System.gc()
// direct asserts introduced some macro generated code that held a reference to the map
val tmpIsNull = null == underlyingMapRef.get.orNull

View file

@ -129,7 +129,7 @@ abstract class DockerJDBCIntegrationSuite
// Start the container and wait until the database can accept JDBC connections:
docker.startContainer(containerId)
jdbcUrl = db.getJdbcUrl(dockerIp, externalPort)
eventually(timeout(60.seconds), interval(1.seconds)) {
eventually(timeout(1.minute), interval(1.second)) {
val conn = java.sql.DriverManager.getConnection(jdbcUrl)
conn.close()
}

View file

@ -99,7 +99,7 @@ class KafkaDontFailOnDataLossSuite extends StreamTest with KafkaMissingOffsetsTe
testUtils.createTopic(topic, partitions = 1)
testUtils.sendMessages(topic, (0 until 50).map(_.toString).toArray)
eventually(timeout(60.seconds)) {
eventually(timeout(1.minute)) {
assert(
testUtils.getEarliestOffsets(Set(topic)).head._2 > 0,
"Kafka didn't delete records after 1 minute")

View file

@ -24,7 +24,6 @@ import java.util.{Collections, Map => JMap, Properties, UUID}
import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._
import scala.language.postfixOps
import scala.util.Random
import kafka.api.Request
@ -138,7 +137,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
setupEmbeddedZookeeper()
setupEmbeddedKafkaServer()
eventually(timeout(60.seconds)) {
eventually(timeout(1.minute)) {
assert(zkUtils.getAllBrokersInCluster().nonEmpty, "Broker was not up in 60 seconds")
}
}
@ -414,7 +413,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
topic: String,
numPartitions: Int,
servers: Seq[KafkaServer]) {
eventually(timeout(60.seconds), interval(200.millis)) {
eventually(timeout(1.minute), interval(200.milliseconds)) {
try {
verifyTopicDeletion(topic, numPartitions, servers)
} catch {
@ -439,7 +438,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
case _ =>
false
}
eventually(timeout(60.seconds)) {
eventually(timeout(1.minute)) {
assert(isPropagated, s"Partition [$topic, $partition] metadata not propagated after timeout")
}
}
@ -448,7 +447,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
* Wait until the latest offset of the given `TopicPartition` is not less than `offset`.
*/
def waitUntilOffsetAppears(topicPartition: TopicPartition, offset: Long): Unit = {
eventually(timeout(60.seconds)) {
eventually(timeout(1.minute)) {
val currentOffset = getLatestOffsets(Set(topicPartition.topic)).get(topicPartition)
assert(currentOffset.nonEmpty && currentOffset.get >= offset)
}

View file

@ -25,7 +25,6 @@ import java.util.concurrent.atomic.AtomicLong
import scala.collection.JavaConverters._
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.Random
import org.apache.kafka.clients.consumer._
@ -153,7 +152,7 @@ class DirectKafkaStreamSuite
allReceived.addAll(Arrays.asList(rdd.map(r => (r.key, r.value)).collect(): _*))
}
ssc.start()
eventually(timeout(100000.milliseconds), interval(1000.milliseconds)) {
eventually(timeout(100.seconds), interval(1.second)) {
assert(allReceived.size === expectedTotal,
"didn't get expected number of messages, messages:\n" +
allReceived.asScala.mkString("\n"))
@ -219,7 +218,7 @@ class DirectKafkaStreamSuite
allReceived.addAll(Arrays.asList(rdd.map(r => (r.key, r.value)).collect(): _*))
}
ssc.start()
eventually(timeout(100000.milliseconds), interval(1000.milliseconds)) {
eventually(timeout(100.seconds), interval(1.second)) {
assert(allReceived.size === expectedTotal,
"didn't get expected number of messages, messages:\n" +
allReceived.asScala.mkString("\n"))
@ -243,7 +242,7 @@ class DirectKafkaStreamSuite
// Send some initial messages before starting context
kafkaTestUtils.sendMessages(topic, data)
eventually(timeout(10 seconds), interval(20 milliseconds)) {
eventually(timeout(10.seconds), interval(20.milliseconds)) {
assert(getLatestOffset() > 3)
}
val offsetBeforeStart = getLatestOffset()
@ -272,7 +271,7 @@ class DirectKafkaStreamSuite
ssc.start()
val newData = Map("b" -> 10)
kafkaTestUtils.sendMessages(topic, newData)
eventually(timeout(10 seconds), interval(50 milliseconds)) {
eventually(timeout(10.seconds), interval(50.milliseconds)) {
collectedData.contains("b")
}
assert(!collectedData.contains("a"))
@ -295,7 +294,7 @@ class DirectKafkaStreamSuite
// Send some initial messages before starting context
kafkaTestUtils.sendMessages(topic, data)
eventually(timeout(10 seconds), interval(20 milliseconds)) {
eventually(timeout(10.seconds), interval(20.milliseconds)) {
assert(getLatestOffset() >= 10)
}
val offsetBeforeStart = getLatestOffset()
@ -326,7 +325,7 @@ class DirectKafkaStreamSuite
ssc.start()
val newData = Map("b" -> 10)
kafkaTestUtils.sendMessages(topic, newData)
eventually(timeout(10 seconds), interval(50 milliseconds)) {
eventually(timeout(10.seconds), interval(50.milliseconds)) {
collectedData.contains("b")
}
assert(!collectedData.contains("a"))
@ -375,7 +374,7 @@ class DirectKafkaStreamSuite
sendData(i)
}
eventually(timeout(20 seconds), interval(50 milliseconds)) {
eventually(timeout(20.seconds), interval(50.milliseconds)) {
assert(DirectKafkaStreamSuite.total.get === (1 to 10).sum)
}
@ -414,7 +413,7 @@ class DirectKafkaStreamSuite
sendData(i)
}
eventually(timeout(20 seconds), interval(50 milliseconds)) {
eventually(timeout(20.seconds), interval(50.milliseconds)) {
assert(DirectKafkaStreamSuite.total.get === (1 to 20).sum)
}
ssc.stop()
@ -437,7 +436,7 @@ class DirectKafkaStreamSuite
def sendDataAndWaitForReceive(data: Seq[Int]) {
val strings = data.map { _.toString}
kafkaTestUtils.sendMessages(topic, strings.map { _ -> 1}.toMap)
eventually(timeout(10 seconds), interval(50 milliseconds)) {
eventually(timeout(10.seconds), interval(50.milliseconds)) {
assert(strings.forall { collectedData.contains })
}
}
@ -612,7 +611,7 @@ class DirectKafkaStreamSuite
estimator.updateRate(rate) // Set a new rate.
// Expect blocks of data equal to "rate", scaled by the interval length in secs.
val expectedSize = Math.round(rate * batchIntervalMilliseconds * 0.001)
eventually(timeout(5.seconds), interval(10 milliseconds)) {
eventually(timeout(5.seconds), interval(10.milliseconds)) {
// Assert that rate estimator values are used to determine maxMessagesPerPartition.
// Funky "-" in message makes the complete assertion message read better.
assert(collectedData.asScala.exists(_.size == expectedSize),

View file

@ -21,7 +21,6 @@ import java.util.concurrent.TimeoutException
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
import scala.language.postfixOps
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
import org.mockito.ArgumentMatchers._
@ -87,7 +86,7 @@ class KinesisCheckpointerSuite extends TestSuiteBase
kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock)
clock.advance(5 * checkpointInterval.milliseconds)
eventually(timeout(1 second)) {
eventually(timeout(1.second)) {
verify(checkpointerMock, times(1)).checkpoint(seqNum)
verify(checkpointerMock, times(1)).checkpoint(otherSeqNum)
}
@ -108,7 +107,7 @@ class KinesisCheckpointerSuite extends TestSuiteBase
kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock)
clock.advance(checkpointInterval.milliseconds * 5)
eventually(timeout(1 second)) {
eventually(timeout(1.second)) {
verify(checkpointerMock, atMost(1)).checkpoint(anyString())
}
}
@ -129,7 +128,7 @@ class KinesisCheckpointerSuite extends TestSuiteBase
kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock)
clock.advance(checkpointInterval.milliseconds)
eventually(timeout(1 second)) {
eventually(timeout(1.second)) {
verify(checkpointerMock, times(1)).checkpoint(anyString())
}
// don't block test thread
@ -138,12 +137,12 @@ class KinesisCheckpointerSuite extends TestSuiteBase
intercept[TimeoutException] {
// scalastyle:off awaitready
Await.ready(f, 50 millis)
Await.ready(f, 50.milliseconds)
// scalastyle:on awaitready
}
clock.advance(checkpointInterval.milliseconds / 2)
eventually(timeout(1 second)) {
eventually(timeout(1.second)) {
verify(checkpointerMock, times(1)).checkpoint(anyString)
verify(checkpointerMock, times(1)).checkpoint()
}

View file

@ -24,7 +24,7 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions._
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason
import com.amazonaws.services.kinesis.model.Record
import org.mockito.ArgumentMatchers.{anyListOf, anyString, eq => meq}
import org.mockito.ArgumentMatchers.{anyList, anyString, eq => meq}
import org.mockito.Mockito.{never, times, verify, when}
import org.scalatest.{BeforeAndAfter, Matchers}
import org.scalatest.mockito.MockitoSugar
@ -95,7 +95,7 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
recordProcessor.processRecords(batch, checkpointerMock)
verify(receiverMock, times(1)).isStopped()
verify(receiverMock, never).addRecords(anyString, anyListOf(classOf[Record]))
verify(receiverMock, never).addRecords(anyString, anyList())
verify(receiverMock, never).setCheckpointer(anyString, meq(checkpointerMock))
}
@ -103,7 +103,7 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
when(receiverMock.isStopped()).thenReturn(false)
when(receiverMock.getCurrentLimit).thenReturn(Int.MaxValue)
when(
receiverMock.addRecords(anyString, anyListOf(classOf[Record]))
receiverMock.addRecords(anyString, anyList())
).thenThrow(new RuntimeException())
intercept[RuntimeException] {

View file

@ -19,7 +19,6 @@ package org.apache.spark.streaming.kinesis
import scala.collection.mutable
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.Random
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
@ -198,10 +197,11 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
ssc.start()
val testData = 1 to 10
eventually(timeout(120 seconds), interval(10 second)) {
eventually(timeout(2.minutes), interval(10.seconds)) {
testUtils.pushData(testData, aggregateTestData)
assert(collected.synchronized { collected === testData.toSet },
"\nData received does not match data sent")
collected.synchronized {
assert(collected === testData.toSet, "\nData received does not match data sent")
}
}
ssc.stop(stopSparkContext = false)
}
@ -217,7 +217,7 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
.initialPosition(new Latest())
.checkpointInterval(Seconds(10))
.storageLevel(StorageLevel.MEMORY_ONLY)
.buildWithMessageHandler(addFive(_))
.buildWithMessageHandler(addFive)
stream shouldBe a [ReceiverInputDStream[_]]
@ -231,11 +231,12 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
ssc.start()
val testData = 1 to 10
eventually(timeout(120 seconds), interval(10 second)) {
eventually(timeout(2.minutes), interval(10.seconds)) {
testUtils.pushData(testData, aggregateTestData)
val modData = testData.map(_ + 5)
assert(collected.synchronized { collected === modData.toSet },
"\nData received does not match data sent")
collected.synchronized {
assert(collected === modData.toSet, "\nData received does not match data sent")
}
}
ssc.stop(stopSparkContext = false)
}
@ -316,10 +317,11 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
val testData2 = 11 to 20
val testData3 = 21 to 30
eventually(timeout(60 seconds), interval(10 second)) {
eventually(timeout(1.minute), interval(10.seconds)) {
localTestUtils.pushData(testData1, aggregateTestData)
assert(collected.synchronized { collected === testData1.toSet },
"\nData received does not match data sent")
collected.synchronized {
assert(collected === testData1.toSet, "\nData received does not match data sent")
}
}
val shardToSplit = localTestUtils.getShards().head
@ -332,10 +334,12 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
assert(splitCloseShards.size == 1)
assert(splitOpenShards.size == 2)
eventually(timeout(60 seconds), interval(10 second)) {
eventually(timeout(1.minute), interval(10.seconds)) {
localTestUtils.pushData(testData2, aggregateTestData)
assert(collected.synchronized { collected === (testData1 ++ testData2).toSet },
"\nData received does not match data sent after splitting a shard")
collected.synchronized {
assert(collected === (testData1 ++ testData2).toSet,
"\nData received does not match data sent after splitting a shard")
}
}
val Seq(shardToMerge, adjShard) = splitOpenShards
@ -348,10 +352,12 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
assert(mergedCloseShards.size == 3)
assert(mergedOpenShards.size == 1)
eventually(timeout(60 seconds), interval(10 second)) {
eventually(timeout(1.minute), interval(10.seconds)) {
localTestUtils.pushData(testData3, aggregateTestData)
assert(collected.synchronized { collected === (testData1 ++ testData2 ++ testData3).toSet },
"\nData received does not match data sent after merging shards")
collected.synchronized {
assert(collected === (testData1 ++ testData2 ++ testData3).toSet,
"\nData received does not match data sent after merging shards")
}
}
} finally {
ssc.stop(stopSparkContext = false)
@ -399,7 +405,7 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
// Run until there are at least 10 batches with some data in them
// If this times out because numBatchesWithData is empty, then its likely that foreachRDD
// function failed with exceptions, and nothing got added to `collectedData`
eventually(timeout(2 minutes), interval(1 seconds)) {
eventually(timeout(2.minutes), interval(1.second)) {
testUtils.pushData(1 to 5, aggregateTestData)
assert(isCheckpointPresent && numBatchesWithData > 10)
}

View file

@ -19,7 +19,6 @@ package org.apache.spark.ml
import scala.collection.mutable
import scala.concurrent.duration._
import scala.language.postfixOps
import org.apache.hadoop.fs.Path
import org.mockito.ArgumentMatchers.{any, eq => meq}
@ -142,7 +141,7 @@ class MLEventsSuite
val expected = Seq(
event0, event1, event2, event3, event4, event5, event6, event7, event8, event9)
eventually(timeout(10 seconds), interval(1 second)) {
eventually(timeout(10.seconds), interval(1.second)) {
assert(events === expected)
}
// Test if they can be ser/de via JSON protocol.
@ -200,7 +199,7 @@ class MLEventsSuite
event7.output = output
val expected = Seq(event0, event1, event2, event3, event4, event5, event6, event7)
eventually(timeout(10 seconds), interval(1 second)) {
eventually(timeout(10.seconds), interval(1.second)) {
assert(events === expected)
}
// Test if they can be ser/de via JSON protocol.
@ -221,7 +220,7 @@ class MLEventsSuite
val pipelineWriter = newPipeline.write
assert(events.isEmpty)
pipelineWriter.save(path)
eventually(timeout(10 seconds), interval(1 second)) {
eventually(timeout(10.seconds), interval(1.second)) {
events.foreach {
case e: SaveInstanceStart if e.writer.isInstanceOf[DefaultParamsWriter] =>
assert(e.path.endsWith("writableStage"))
@ -245,18 +244,18 @@ class MLEventsSuite
val pipelineReader = Pipeline.read
assert(events.isEmpty)
pipelineReader.load(path)
eventually(timeout(10 seconds), interval(1 second)) {
eventually(timeout(10.seconds), interval(1.second)) {
events.foreach {
case e: LoadInstanceStart[PipelineStage]
if e.reader.isInstanceOf[DefaultParamsReader[PipelineStage]] =>
case e: LoadInstanceStart[_]
if e.reader.isInstanceOf[DefaultParamsReader[_]] =>
assert(e.path.endsWith("writableStage"))
case e: LoadInstanceEnd[PipelineStage]
if e.reader.isInstanceOf[DefaultParamsReader[PipelineStage]] =>
case e: LoadInstanceEnd[_]
if e.reader.isInstanceOf[DefaultParamsReader[_]] =>
assert(e.instance.isInstanceOf[PipelineStage])
case e: LoadInstanceStart[Pipeline] =>
case e: LoadInstanceStart[_] =>
assert(e.reader === pipelineReader)
case e: LoadInstanceEnd[Pipeline] =>
assert(e.instance.uid === newPipeline.uid)
case e: LoadInstanceEnd[_] =>
assert(e.instance.asInstanceOf[Pipeline].uid === newPipeline.uid)
case e => fail(s"Unexpected event thrown: $e")
}
}
@ -280,7 +279,7 @@ class MLEventsSuite
val pipelineWriter = pipelineModel.write
assert(events.isEmpty)
pipelineWriter.save(path)
eventually(timeout(10 seconds), interval(1 second)) {
eventually(timeout(10.seconds), interval(1.second)) {
events.foreach {
case e: SaveInstanceStart if e.writer.isInstanceOf[DefaultParamsWriter] =>
assert(e.path.endsWith("writableStage"))
@ -304,18 +303,18 @@ class MLEventsSuite
val pipelineModelReader = PipelineModel.read
assert(events.isEmpty)
pipelineModelReader.load(path)
eventually(timeout(10 seconds), interval(1 second)) {
eventually(timeout(10.seconds), interval(1.second)) {
events.foreach {
case e: LoadInstanceStart[PipelineStage]
if e.reader.isInstanceOf[DefaultParamsReader[PipelineStage]] =>
case e: LoadInstanceStart[_]
if e.reader.isInstanceOf[DefaultParamsReader[_]] =>
assert(e.path.endsWith("writableStage"))
case e: LoadInstanceEnd[PipelineStage]
if e.reader.isInstanceOf[DefaultParamsReader[PipelineStage]] =>
case e: LoadInstanceEnd[_]
if e.reader.isInstanceOf[DefaultParamsReader[_]] =>
assert(e.instance.isInstanceOf[PipelineStage])
case e: LoadInstanceStart[PipelineModel] =>
case e: LoadInstanceStart[_] =>
assert(e.reader === pipelineModelReader)
case e: LoadInstanceEnd[PipelineModel] =>
assert(e.instance.uid === pipelineModel.uid)
case e: LoadInstanceEnd[_] =>
assert(e.instance.asInstanceOf[PipelineModel].uid === pipelineModel.uid)
case e => fail(s"Unexpected event thrown: $e")
}
}

View file

@ -24,10 +24,8 @@ import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._
import scala.concurrent.duration._
import scala.language.postfixOps
import com.google.common.io.Files
import org.apache.commons.lang3.SerializationUtils
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.server.MiniYARNCluster
import org.scalatest.{BeforeAndAfterAll, Matchers}
@ -169,7 +167,7 @@ abstract class BaseYarnClusterSuite
val handle = launcher.startApplication()
try {
eventually(timeout(2 minutes), interval(1 second)) {
eventually(timeout(2.minutes), interval(1.second)) {
assert(handle.getState().isFinal())
}
} finally {

View file

@ -18,18 +18,14 @@
package org.apache.spark.deploy.yarn
import java.io.File
import java.net.URL
import java.nio.charset.StandardCharsets
import java.util.{HashMap => JHashMap}
import scala.collection.mutable
import scala.concurrent.duration._
import scala.io.Source
import scala.language.postfixOps
import com.google.common.io.{ByteStreams, Files}
import org.apache.hadoop.HadoopIllegalArgumentException
import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.util.ConverterUtils
import org.scalatest.Matchers
@ -209,7 +205,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
.startApplication()
try {
eventually(timeout(30 seconds), interval(100 millis)) {
eventually(timeout(30.seconds), interval(100.milliseconds)) {
handle.getState() should be (SparkAppHandle.State.RUNNING)
}
@ -217,7 +213,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
handle.getAppId() should startWith ("application_")
handle.stop()
eventually(timeout(30 seconds), interval(100 millis)) {
eventually(timeout(30.seconds), interval(100.milliseconds)) {
handle.getState() should be (SparkAppHandle.State.KILLED)
}
} finally {

View file

@ -24,7 +24,6 @@ import java.util.EnumSet
import scala.annotation.tailrec
import scala.concurrent.duration._
import scala.language.postfixOps
import org.apache.hadoop.fs.Path
import org.apache.hadoop.service.ServiceStateException
@ -327,10 +326,10 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
recoveryPath.toString should be (new Path(execStateFile2.getParentFile.toURI).toString)
recoveryPath.toString should be (new Path(secretsFile2.getParentFile.toURI).toString)
eventually(timeout(10 seconds), interval(5 millis)) {
eventually(timeout(10.seconds), interval(5.milliseconds)) {
assert(!execStateFile.exists())
}
eventually(timeout(10 seconds), interval(5 millis)) {
eventually(timeout(10.seconds), interval(5.milliseconds)) {
assert(!secretsFile.exists())
}

View file

@ -17,17 +17,13 @@
package test.org.apache.spark.sql.sources.v2;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.sql.sources.v2.SupportsRead;
import org.apache.spark.sql.sources.v2.Table;
import org.apache.spark.sql.sources.v2.TableCapability;
import org.apache.spark.sql.sources.v2.reader.*;
import org.apache.spark.sql.types.StructType;
abstract class JavaSimpleBatchTable implements Table, SupportsRead {
@ -52,53 +48,3 @@ abstract class JavaSimpleBatchTable implements Table, SupportsRead {
}
}
abstract class JavaSimpleScanBuilder implements ScanBuilder, Scan, Batch {
@Override
public Scan build() {
return this;
}
@Override
public Batch toBatch() {
return this;
}
@Override
public StructType readSchema() {
return new StructType().add("i", "int").add("j", "int");
}
@Override
public PartitionReaderFactory createReaderFactory() {
return new JavaSimpleReaderFactory();
}
}
class JavaSimpleReaderFactory implements PartitionReaderFactory {
@Override
public PartitionReader<InternalRow> createReader(InputPartition partition) {
JavaRangeInputPartition p = (JavaRangeInputPartition) partition;
return new PartitionReader<InternalRow>() {
private int current = p.start - 1;
@Override
public boolean next() throws IOException {
current += 1;
return current < p.end;
}
@Override
public InternalRow get() {
return new GenericInternalRow(new Object[] {current, -current});
}
@Override
public void close() throws IOException {
}
};
}
}

View file

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package test.org.apache.spark.sql.sources.v2;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.sql.sources.v2.reader.InputPartition;
import org.apache.spark.sql.sources.v2.reader.PartitionReader;
import org.apache.spark.sql.sources.v2.reader.PartitionReaderFactory;
class JavaSimpleReaderFactory implements PartitionReaderFactory {
@Override
public PartitionReader<InternalRow> createReader(InputPartition partition) {
JavaRangeInputPartition p = (JavaRangeInputPartition) partition;
return new PartitionReader<InternalRow>() {
private int current = p.start - 1;
@Override
public boolean next() {
current += 1;
return current < p.end;
}
@Override
public InternalRow get() {
return new GenericInternalRow(new Object[] {current, -current});
}
@Override
public void close() {
}
};
}
}

View file

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package test.org.apache.spark.sql.sources.v2;
import org.apache.spark.sql.sources.v2.reader.Batch;
import org.apache.spark.sql.sources.v2.reader.PartitionReaderFactory;
import org.apache.spark.sql.sources.v2.reader.Scan;
import org.apache.spark.sql.sources.v2.reader.ScanBuilder;
import org.apache.spark.sql.types.StructType;
abstract class JavaSimpleScanBuilder implements ScanBuilder, Scan, Batch {
@Override
public Scan build() {
return this;
}
@Override
public Batch toBatch() {
return this;
}
@Override
public StructType readSchema() {
return new StructType().add("i", "int").add("j", "int");
}
@Override
public PartitionReaderFactory createReaderFactory() {
return new JavaSimpleReaderFactory();
}
}

View file

@ -19,7 +19,6 @@ package org.apache.spark.sql
import scala.collection.mutable.HashSet
import scala.concurrent.duration._
import scala.language.postfixOps
import org.apache.spark.CleanerListener
import org.apache.spark.executor.DataReadMethod._
@ -251,7 +250,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
sql("UNCACHE TABLE testData")
assert(!spark.catalog.isCached("testData"), "Table 'testData' should not be cached")
eventually(timeout(10 seconds)) {
eventually(timeout(10.seconds)) {
assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
}
}
@ -267,7 +266,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
"Eagerly cached in-memory table should have already been materialized")
uncacheTable("testCacheTable")
eventually(timeout(10 seconds)) {
eventually(timeout(10.seconds)) {
assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
}
}
@ -284,7 +283,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
"Eagerly cached in-memory table should have already been materialized")
uncacheTable("testCacheTable")
eventually(timeout(10 seconds)) {
eventually(timeout(10.seconds)) {
assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
}
}
@ -305,7 +304,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
"Lazily cached in-memory table should have been materialized")
uncacheTable("testData")
eventually(timeout(10 seconds)) {
eventually(timeout(10.seconds)) {
assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
}
}
@ -446,7 +445,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
System.gc()
eventually(timeout(10 seconds)) {
eventually(timeout(10.seconds)) {
assert(toBeCleanedAccIds.synchronized { toBeCleanedAccIds.isEmpty },
"batchStats accumulators should be cleared after GC when uncacheTable")
}

View file

@ -136,7 +136,7 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext with TimeLimits
assertCached(df2)
// udf has been evaluated during caching, and thus should not be re-evaluated here
failAfter(2 seconds) {
failAfter(2.seconds) {
df2.collect()
}
@ -197,7 +197,7 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext with TimeLimits
val df4 = df1.groupBy('a).agg(sum('b)).agg(sum("sum(b)"))
assertCached(df4)
// reuse loaded cache
failAfter(3 seconds) {
failAfter(3.seconds) {
checkDataset(df4, Row(10))
}

View file

@ -308,7 +308,7 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before
val offsets = scala.collection.mutable.ListBuffer[Int]()
val readerFactory = stream.createContinuousReaderFactory()
import org.scalatest.time.SpanSugar._
failAfter(5 seconds) {
failAfter(5.seconds) {
// inject rows, read and check the data and offsets
for (i <- 0 until numRecords) {
serverThread.enqueue(i.toString)

View file

@ -41,7 +41,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext {
assert(coordinatorRef.getLocation(id) === None)
coordinatorRef.reportActiveInstance(id, "hostX", "exec1")
eventually(timeout(5 seconds)) {
eventually(timeout(5.seconds)) {
assert(coordinatorRef.verifyIfInstanceActive(id, "exec1"))
assert(
coordinatorRef.getLocation(id) ===
@ -50,7 +50,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext {
coordinatorRef.reportActiveInstance(id, "hostX", "exec2")
eventually(timeout(5 seconds)) {
eventually(timeout(5.seconds)) {
assert(coordinatorRef.verifyIfInstanceActive(id, "exec1") === false)
assert(coordinatorRef.verifyIfInstanceActive(id, "exec2"))
@ -75,7 +75,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext {
coordinatorRef.reportActiveInstance(id2, host, exec)
coordinatorRef.reportActiveInstance(id3, host, exec)
eventually(timeout(5 seconds)) {
eventually(timeout(5.seconds)) {
assert(coordinatorRef.verifyIfInstanceActive(id1, exec))
assert(coordinatorRef.verifyIfInstanceActive(id2, exec))
assert(coordinatorRef.verifyIfInstanceActive(id3, exec))
@ -107,7 +107,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext {
coordRef1.reportActiveInstance(id, "hostX", "exec1")
eventually(timeout(5 seconds)) {
eventually(timeout(5.seconds)) {
assert(coordRef2.verifyIfInstanceActive(id, "exec1"))
assert(
coordRef2.getLocation(id) ===

View file

@ -416,7 +416,7 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
}
}
val timeoutDuration = 60 seconds
val timeoutDuration = 1.minute
quietly {
withSpark(new SparkContext(conf)) { sc =>

View file

@ -93,7 +93,7 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
testAwaitAnyTermination(ExpectBlocked)
// Stop a query asynchronously and see if it is reported through awaitAnyTermination
val q1 = stopRandomQueryAsync(stopAfter = 100 milliseconds, withError = false)
val q1 = stopRandomQueryAsync(stopAfter = 100.milliseconds, withError = false)
testAwaitAnyTermination(ExpectNotBlocked)
require(!q1.isActive) // should be inactive by the time the prev awaitAnyTerm returned
@ -106,7 +106,7 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
// Terminate a query asynchronously with exception and see awaitAnyTermination throws
// the exception
val q2 = stopRandomQueryAsync(100 milliseconds, withError = true)
val q2 = stopRandomQueryAsync(100.milliseconds, withError = true)
testAwaitAnyTermination(ExpectException[SparkException])
require(!q2.isActive) // should be inactive by the time the prev awaitAnyTerm returned
@ -119,10 +119,10 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
// Terminate multiple queries, one with failure and see whether awaitAnyTermination throws
// the exception
val q3 = stopRandomQueryAsync(10 milliseconds, withError = false)
val q3 = stopRandomQueryAsync(10.milliseconds, withError = false)
testAwaitAnyTermination(ExpectNotBlocked)
require(!q3.isActive)
val q4 = stopRandomQueryAsync(10 milliseconds, withError = true)
val q4 = stopRandomQueryAsync(10.milliseconds, withError = true)
eventually(Timeout(streamingTimeout)) { require(!q4.isActive) }
// After q4 terminates with exception, awaitAnyTerm should start throwing exception
testAwaitAnyTermination(ExpectException[SparkException])
@ -138,81 +138,81 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
// awaitAnyTermination should be blocking or non-blocking depending on timeout values
testAwaitAnyTermination(
ExpectBlocked,
awaitTimeout = 4 seconds,
awaitTimeout = 4.seconds,
expectedReturnedValue = false,
testBehaviorFor = 2 seconds)
testBehaviorFor = 2.seconds)
testAwaitAnyTermination(
ExpectNotBlocked,
awaitTimeout = 50 milliseconds,
awaitTimeout = 50.milliseconds,
expectedReturnedValue = false,
testBehaviorFor = 1 second)
testBehaviorFor = 1.second)
// Stop a query asynchronously within timeout and awaitAnyTerm should be unblocked
val q1 = stopRandomQueryAsync(stopAfter = 100 milliseconds, withError = false)
val q1 = stopRandomQueryAsync(stopAfter = 100.milliseconds, withError = false)
testAwaitAnyTermination(
ExpectNotBlocked,
awaitTimeout = 2 seconds,
awaitTimeout = 2.seconds,
expectedReturnedValue = true,
testBehaviorFor = 4 seconds)
testBehaviorFor = 4.seconds)
require(!q1.isActive) // should be inactive by the time the prev awaitAnyTerm returned
// All subsequent calls to awaitAnyTermination should be non-blocking even if timeout is high
testAwaitAnyTermination(
ExpectNotBlocked, awaitTimeout = 4 seconds, expectedReturnedValue = true)
ExpectNotBlocked, awaitTimeout = 4.seconds, expectedReturnedValue = true)
// Resetting termination should make awaitAnyTermination() blocking again
spark.streams.resetTerminated()
testAwaitAnyTermination(
ExpectBlocked,
awaitTimeout = 4 seconds,
awaitTimeout = 4.seconds,
expectedReturnedValue = false,
testBehaviorFor = 1 second)
testBehaviorFor = 1.second)
// Terminate a query asynchronously with exception within timeout, awaitAnyTermination should
// throws the exception
val q2 = stopRandomQueryAsync(100 milliseconds, withError = true)
val q2 = stopRandomQueryAsync(100.milliseconds, withError = true)
testAwaitAnyTermination(
ExpectException[SparkException],
awaitTimeout = 4 seconds,
testBehaviorFor = 6 seconds)
awaitTimeout = 4.seconds,
testBehaviorFor = 6.seconds)
require(!q2.isActive) // should be inactive by the time the prev awaitAnyTerm returned
// All subsequent calls to awaitAnyTermination should throw the exception
testAwaitAnyTermination(
ExpectException[SparkException],
awaitTimeout = 2 seconds,
testBehaviorFor = 4 seconds)
awaitTimeout = 2.seconds,
testBehaviorFor = 4.seconds)
// Terminate a query asynchronously outside the timeout, awaitAnyTerm should be blocked
spark.streams.resetTerminated()
val q3 = stopRandomQueryAsync(2 seconds, withError = true)
val q3 = stopRandomQueryAsync(2.seconds, withError = true)
testAwaitAnyTermination(
ExpectNotBlocked,
awaitTimeout = 100 milliseconds,
awaitTimeout = 100.milliseconds,
expectedReturnedValue = false,
testBehaviorFor = 4 seconds)
testBehaviorFor = 4.seconds)
// After that query is stopped, awaitAnyTerm should throw exception
eventually(Timeout(streamingTimeout)) { require(!q3.isActive) } // wait for query to stop
testAwaitAnyTermination(
ExpectException[SparkException],
awaitTimeout = 100 milliseconds,
testBehaviorFor = 4 seconds)
awaitTimeout = 100.milliseconds,
testBehaviorFor = 4.seconds)
// Terminate multiple queries, one with failure and see whether awaitAnyTermination throws
// the exception
spark.streams.resetTerminated()
val q4 = stopRandomQueryAsync(10 milliseconds, withError = false)
val q4 = stopRandomQueryAsync(10.milliseconds, withError = false)
testAwaitAnyTermination(
ExpectNotBlocked, awaitTimeout = 2 seconds, expectedReturnedValue = true)
ExpectNotBlocked, awaitTimeout = 2.seconds, expectedReturnedValue = true)
require(!q4.isActive)
val q5 = stopRandomQueryAsync(10 milliseconds, withError = true)
val q5 = stopRandomQueryAsync(10.milliseconds, withError = true)
eventually(Timeout(streamingTimeout)) { require(!q5.isActive) }
// After q5 terminates with exception, awaitAnyTerm should start throwing exception
testAwaitAnyTermination(ExpectException[SparkException], awaitTimeout = 2 seconds)
testAwaitAnyTermination(ExpectException[SparkException], awaitTimeout = 2.seconds)
}
}
@ -276,7 +276,7 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
expectedBehavior: ExpectedBehavior,
expectedReturnedValue: Boolean = false,
awaitTimeout: Span = null,
testBehaviorFor: Span = 4 seconds
testBehaviorFor: Span = 4.seconds
): Unit = {
def awaitTermFunc(): Unit = {

View file

@ -20,7 +20,6 @@ package org.apache.spark.sql.streaming
import java.util.UUID
import scala.collection.JavaConverters._
import scala.language.postfixOps
import org.json4s._
import org.json4s.jackson.JsonMethods._
@ -203,7 +202,7 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
val progress = query.lastProgress
assert(progress.stateOperators.length > 0)
// Should emit new progresses every 10 ms, but we could be facing a slow Jenkins
eventually(timeout(1 minute)) {
eventually(timeout(1.minute)) {
val nextProgress = query.lastProgress
assert(nextProgress.timestamp !== progress.timestamp)
assert(nextProgress.numInputRows === 0)

View file

@ -86,12 +86,12 @@ class UISeleniumSuite
queries.foreach(statement.execute)
eventually(timeout(10 seconds), interval(50 milliseconds)) {
eventually(timeout(10.seconds), interval(50.milliseconds)) {
go to baseURL
find(cssSelector("""ul li a[href*="sql"]""")) should not be None
}
eventually(timeout(10 seconds), interval(50 milliseconds)) {
eventually(timeout(10.seconds), interval(50.milliseconds)) {
go to (baseURL + "/sql")
find(id("sessionstat")) should not be None
find(id("sqlstat")) should not be None

View file

@ -19,7 +19,7 @@ package org.apache.spark.streaming.receiver
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.language.{existentials, postfixOps}
import scala.language.existentials
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

View file

@ -25,7 +25,6 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.collection.parallel.ExecutionContextTaskSupport
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.language.postfixOps
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
@ -142,7 +141,7 @@ private[streaming] class FileBasedWriteAheadLog(
CompletionIterator[ByteBuffer, Iterator[ByteBuffer]](reader, () => reader.close())
}
if (!closeFileAfterWrite) {
logFilesToRead.iterator.map(readFile).flatten.asJava
logFilesToRead.iterator.flatMap(readFile).asJava
} else {
// For performance gains, it makes sense to parallelize the recovery if
// closeFileAfterWrite = true
@ -190,7 +189,7 @@ private[streaming] class FileBasedWriteAheadLog(
if (waitForCompletion) {
import scala.concurrent.duration._
// scalastyle:off awaitready
Await.ready(f, 1 second)
Await.ready(f, 1.second)
// scalastyle:on awaitready
}
} catch {

View file

@ -162,12 +162,12 @@ trait DStreamCheckpointTester { self: SparkFunSuite =>
val outputStream = getTestOutputStream[V](ssc.graph.getOutputStreams())
eventually(timeout(10 seconds)) {
eventually(timeout(10.seconds)) {
ssc.awaitTerminationOrTimeout(10)
assert(batchCounter.getLastCompletedBatchTime === targetBatchTime)
}
eventually(timeout(10 seconds)) {
eventually(timeout(10.seconds)) {
val checkpointFilesOfLatestTime = Checkpoint.getCheckpointFiles(checkpointDir).filter {
_.getName.contains(clock.getTimeMillis.toString)
}

View file

@ -22,7 +22,6 @@ import java.nio.ByteBuffer
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.reflect.ClassTag
import org.apache.hadoop.conf.Configuration
@ -199,7 +198,7 @@ abstract class BaseReceivedBlockHandlerSuite(enableEncryption: Boolean)
val cleanupThreshTime = 3000L
handler.cleanupOldBlocks(cleanupThreshTime)
eventually(timeout(10000 millis), interval(10 millis)) {
eventually(timeout(10.seconds), interval(10.milliseconds)) {
getWriteAheadLogFiles().size should be < preCleanupLogFiles.size
}
}

View file

@ -22,7 +22,7 @@ import java.nio.ByteBuffer
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
import scala.language.{implicitConversions, postfixOps}
import scala.language.implicitConversions
import scala.util.Random
import org.apache.hadoop.conf.Configuration
@ -276,7 +276,7 @@ class ReceivedBlockTrackerSuite
getWrittenLogData(getWriteAheadLogFiles().last) should contain(createBatchCleanup(batchTime1))
// Verify that at least one log file gets deleted
eventually(timeout(10 seconds), interval(10 millisecond)) {
eventually(timeout(10.seconds), interval(10.millisecond)) {
getWriteAheadLogFiles() should not contain oldestLogFile
}
printLogFiles("After clean")

View file

@ -64,7 +64,7 @@ class ReceiverSuite extends TestSuiteBase with TimeLimits with Serializable {
// Verify that the receiver
intercept[Exception] {
failAfter(200 millis) {
failAfter(200.milliseconds) {
executingThread.join()
}
}
@ -78,7 +78,7 @@ class ReceiverSuite extends TestSuiteBase with TimeLimits with Serializable {
assert(receiver.isStarted)
assert(!receiver.isStopped())
assert(receiver.otherThread.isAlive)
eventually(timeout(100 millis), interval(10 millis)) {
eventually(timeout(100.milliseconds), interval(10.milliseconds)) {
assert(receiver.receiving)
}
@ -107,12 +107,12 @@ class ReceiverSuite extends TestSuiteBase with TimeLimits with Serializable {
// Verify restarting actually stops and starts the receiver
receiver.restart("restarting", null, 600)
eventually(timeout(300 millis), interval(10 millis)) {
eventually(timeout(300.milliseconds), interval(10.milliseconds)) {
// receiver will be stopped async
assert(receiver.isStopped)
assert(receiver.onStopCalled)
}
eventually(timeout(1000 millis), interval(10 millis)) {
eventually(timeout(1.second), interval(10.milliseconds)) {
// receiver will be started async
assert(receiver.onStartCalled)
assert(executor.isReceiverStarted)
@ -122,7 +122,7 @@ class ReceiverSuite extends TestSuiteBase with TimeLimits with Serializable {
}
// Verify that stopping actually stops the thread
failAfter(100 millis) {
failAfter(100.milliseconds) {
receiver.stop("test")
assert(receiver.isStopped)
assert(!receiver.otherThread.isAlive)
@ -159,7 +159,7 @@ class ReceiverSuite extends TestSuiteBase with TimeLimits with Serializable {
val recordedBlocks = blockGeneratorListener.arrayBuffers
val recordedData = recordedBlocks.flatten
assert(blockGeneratorListener.arrayBuffers.size > 0, "No blocks received")
assert(blockGeneratorListener.arrayBuffers.nonEmpty, "No blocks received")
assert(recordedData.toSet === generatedData.toSet, "Received data not same")
// recordedData size should be close to the expected rate; use an error margin proportional to
@ -245,15 +245,15 @@ class ReceiverSuite extends TestSuiteBase with TimeLimits with Serializable {
// Run until sufficient WAL files have been generated and
// the first WAL files has been deleted
eventually(timeout(20 seconds), interval(batchDuration.milliseconds millis)) {
eventually(timeout(20.seconds), interval(batchDuration.milliseconds.millis)) {
val (logFiles1, logFiles2) = getBothCurrentLogFiles()
allLogFiles1 ++= logFiles1
allLogFiles2 ++= logFiles2
if (allLogFiles1.size > 0) {
assert(!logFiles1.contains(allLogFiles1.toSeq.sorted.head))
if (allLogFiles1.nonEmpty) {
assert(!logFiles1.contains(allLogFiles1.toSeq.min))
}
if (allLogFiles2.size > 0) {
assert(!logFiles2.contains(allLogFiles2.toSeq.sorted.head))
if (allLogFiles2.nonEmpty) {
assert(!logFiles2.contains(allLogFiles2.toSeq.min))
}
assert(allLogFiles1.size >= 7)
assert(allLogFiles2.size >= 7)

View file

@ -211,7 +211,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL
// Local props set after start should be ignored
ssc.sc.setLocalProperty("customPropKey", "value2")
eventually(timeout(10 seconds), interval(10 milliseconds)) {
eventually(timeout(10.seconds), interval(10.milliseconds)) {
assert(allFound)
}
@ -342,7 +342,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL
input.foreachRDD(_ => {})
ssc.start()
// Call `ssc.stop` at once so that it's possible that the receiver will miss "StopReceiver"
failAfter(30000 millis) {
failAfter(30.seconds) {
ssc.stop(stopSparkContext = true, stopGracefully = true)
}
}
@ -398,18 +398,18 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL
inputStream.map(x => x).register()
// test whether start() blocks indefinitely or not
failAfter(2000 millis) {
failAfter(2.seconds) {
ssc.start()
}
// test whether awaitTermination() exits after give amount of time
failAfter(1000 millis) {
failAfter(1.second) {
ssc.awaitTerminationOrTimeout(500)
}
// test whether awaitTermination() does not exit if not time is given
val exception = intercept[Exception] {
failAfter(1000 millis) {
failAfter(1.second) {
ssc.awaitTermination()
throw new Exception("Did not wait for stop")
}
@ -418,7 +418,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL
var t: Thread = null
// test whether wait exits if context is stopped
failAfter(10000 millis) { // 10 seconds because spark takes a long time to shutdown
failAfter(10.seconds) { // 10 seconds because spark takes a long time to shutdown
t = new Thread() {
override def run() {
Thread.sleep(500)
@ -439,7 +439,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL
val inputStream = addInputStream(ssc)
inputStream.map(x => x).register()
failAfter(10000 millis) {
failAfter(10.seconds) {
ssc.start()
ssc.stop()
ssc.awaitTermination()
@ -479,13 +479,13 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL
ssc.start()
// test whether awaitTerminationOrTimeout() return false after give amount of time
failAfter(1000 millis) {
failAfter(1.second) {
assert(ssc.awaitTerminationOrTimeout(500) === false)
}
var t: Thread = null
// test whether awaitTerminationOrTimeout() return true if context is stopped
failAfter(10000 millis) { // 10 seconds because spark takes a long time to shutdown
failAfter(10.seconds) { // 10 seconds because spark takes a long time to shutdown
t = new Thread() {
override def run() {
Thread.sleep(500)
@ -528,7 +528,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL
// getOrCreate should create new context with empty path
testGetOrCreate {
ssc = StreamingContext.getOrCreate(emptyPath, creatingFunction _)
ssc = StreamingContext.getOrCreate(emptyPath, () => creatingFunction())
assert(ssc != null, "no context created")
assert(newContextCreated, "new context not created")
}
@ -537,19 +537,19 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL
// getOrCreate should throw exception with fake checkpoint file and createOnError = false
intercept[Exception] {
ssc = StreamingContext.getOrCreate(corruptedCheckpointPath, creatingFunction _)
ssc = StreamingContext.getOrCreate(corruptedCheckpointPath, () => creatingFunction())
}
// getOrCreate should throw exception with fake checkpoint file
intercept[Exception] {
ssc = StreamingContext.getOrCreate(
corruptedCheckpointPath, creatingFunction _, createOnError = false)
corruptedCheckpointPath, () => creatingFunction(), createOnError = false)
}
// getOrCreate should create new context with fake checkpoint file and createOnError = true
testGetOrCreate {
ssc = StreamingContext.getOrCreate(
corruptedCheckpointPath, creatingFunction _, createOnError = true)
corruptedCheckpointPath, () => creatingFunction(), createOnError = true)
assert(ssc != null, "no context created")
assert(newContextCreated, "new context not created")
}
@ -558,7 +558,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL
// getOrCreate should recover context with checkpoint path, and recover old configuration
testGetOrCreate {
ssc = StreamingContext.getOrCreate(checkpointPath, creatingFunction _)
ssc = StreamingContext.getOrCreate(checkpointPath, () => creatingFunction())
assert(ssc != null, "no context created")
assert(!newContextCreated, "old context not recovered")
assert(ssc.conf.get("someKey") === "someValue", "checkpointed config not recovered")
@ -567,7 +567,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL
// getOrCreate should recover StreamingContext with existing SparkContext
testGetOrCreate {
sc = new SparkContext(conf)
ssc = StreamingContext.getOrCreate(checkpointPath, creatingFunction _)
ssc = StreamingContext.getOrCreate(checkpointPath, () => creatingFunction())
assert(ssc != null, "no context created")
assert(!newContextCreated, "old context not recovered")
assert(!ssc.conf.contains("someKey"), "checkpointed config unexpectedly recovered")
@ -669,41 +669,41 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL
conf.clone.set("spark.streaming.clock", "org.apache.spark.util.ManualClock"), batchDuration)
addInputStream(ssc).register()
ssc.start()
val returnedSsc = StreamingContext.getActiveOrCreate(checkpointPath, creatingFunction _)
val returnedSsc = StreamingContext.getActiveOrCreate(checkpointPath, () => creatingFunction())
assert(!newContextCreated, "new context created instead of returning")
assert(returnedSsc.eq(ssc), "returned context is not the activated context")
}
// getActiveOrCreate should create new context with empty path
testGetActiveOrCreate {
ssc = StreamingContext.getActiveOrCreate(emptyPath, creatingFunction _)
ssc = StreamingContext.getActiveOrCreate(emptyPath, () => creatingFunction())
assert(ssc != null, "no context created")
assert(newContextCreated, "new context not created")
}
// getActiveOrCreate should throw exception with fake checkpoint file and createOnError = false
intercept[Exception] {
ssc = StreamingContext.getOrCreate(corruptedCheckpointPath, creatingFunction _)
ssc = StreamingContext.getOrCreate(corruptedCheckpointPath, () => creatingFunction())
}
// getActiveOrCreate should throw exception with fake checkpoint file
intercept[Exception] {
ssc = StreamingContext.getActiveOrCreate(
corruptedCheckpointPath, creatingFunction _, createOnError = false)
corruptedCheckpointPath, () => creatingFunction(), createOnError = false)
}
// getActiveOrCreate should create new context with fake
// checkpoint file and createOnError = true
testGetActiveOrCreate {
ssc = StreamingContext.getActiveOrCreate(
corruptedCheckpointPath, creatingFunction _, createOnError = true)
corruptedCheckpointPath, () => creatingFunction(), createOnError = true)
assert(ssc != null, "no context created")
assert(newContextCreated, "new context not created")
}
// getActiveOrCreate should recover context with checkpoint path, and recover old configuration
testGetActiveOrCreate {
ssc = StreamingContext.getActiveOrCreate(checkpointPath, creatingFunction _)
ssc = StreamingContext.getActiveOrCreate(checkpointPath, () => creatingFunction())
assert(ssc != null, "no context created")
assert(!newContextCreated, "old context not recovered")
assert(ssc.conf.get("someKey") === "someValue")
@ -781,14 +781,14 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL
_ssc.queueStream[Int](Queue(rdd)).register()
_ssc
}
ssc = StreamingContext.getOrCreate(checkpointDirectory, creatingFunction _)
ssc = StreamingContext.getOrCreate(checkpointDirectory, () => creatingFunction())
ssc.start()
eventually(timeout(10000 millis)) {
eventually(timeout(10.seconds)) {
assert(Checkpoint.getCheckpointFiles(checkpointDirectory).size > 1)
}
ssc.stop()
val e = intercept[SparkException] {
ssc = StreamingContext.getOrCreate(checkpointDirectory, creatingFunction _)
ssc = StreamingContext.getOrCreate(checkpointDirectory, () => creatingFunction())
}
// StreamingContext.validate changes the message, so use "contains" here
assert(e.getCause.getMessage.contains("queueStream doesn't support checkpointing. " +
@ -855,7 +855,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL
ssc.textFileStream(testDirectory).foreachRDD { rdd => rdd.count() }
ssc.start()
try {
eventually(timeout(30000 millis)) {
eventually(timeout(30.seconds)) {
assert(Checkpoint.getCheckpointFiles(checkpointDirectory).size > 1)
}
} finally {
@ -967,7 +967,7 @@ package object testPackage extends Assertions {
}
ssc.start()
eventually(timeout(10000 millis), interval(10 millis)) {
eventually(timeout(10.seconds), interval(10.milliseconds)) {
assert(rddGenerated && rddCreationSiteCorrect, "RDD creation site was not correct")
assert(rddGenerated && foreachCallSiteCorrect, "Call site in foreachRDD was not correct")
}

View file

@ -130,7 +130,7 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
ssc.start()
try {
eventually(timeout(30 seconds), interval(20 millis)) {
eventually(timeout(30.seconds), interval(20.milliseconds)) {
collector.startedReceiverStreamIds.size should equal (1)
collector.startedReceiverStreamIds.peek() should equal (0)
collector.stoppedReceiverStreamIds.size should equal (1)
@ -157,7 +157,7 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
ssc.start()
try {
eventually(timeout(30 seconds), interval(20 millis)) {
eventually(timeout(30.seconds), interval(20.milliseconds)) {
collector.startedOutputOperationIds.asScala.take(3) should be (Seq(0, 1, 2))
collector.completedOutputOperationIds.asScala.take(3) should be (Seq(0, 1, 2))
}

View file

@ -97,12 +97,12 @@ class UISeleniumSuite
val sparkUI = ssc.sparkContext.ui.get
eventually(timeout(10 seconds), interval(50 milliseconds)) {
eventually(timeout(10.seconds), interval(50.milliseconds)) {
go to (sparkUI.webUrl.stripSuffix("/"))
find(cssSelector( """ul li a[href*="streaming"]""")) should not be (None)
}
eventually(timeout(10 seconds), interval(50 milliseconds)) {
eventually(timeout(10.seconds), interval(50.milliseconds)) {
// check whether streaming page exists
go to (sparkUI.webUrl.stripSuffix("/") + "/streaming")
val h3Text = findAll(cssSelector("h3")).map(_.text).toSeq
@ -196,12 +196,12 @@ class UISeleniumSuite
ssc.stop(false)
eventually(timeout(10 seconds), interval(50 milliseconds)) {
eventually(timeout(10.seconds), interval(50.milliseconds)) {
go to (sparkUI.webUrl.stripSuffix("/"))
find(cssSelector( """ul li a[href*="streaming"]""")) should be(None)
}
eventually(timeout(10 seconds), interval(50 milliseconds)) {
eventually(timeout(10.seconds), interval(50.milliseconds)) {
go to (sparkUI.webUrl.stripSuffix("/") + "/streaming")
val h3Text = findAll(cssSelector("h3")).map(_.text).toSeq
h3Text should not contain("Streaming Statistics")

View file

@ -84,7 +84,7 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter with TimeLim
}
clock.advance(blockIntervalMs) // advance clock to generate blocks
withClue("blocks not generated or pushed") {
eventually(timeout(1 second)) {
eventually(timeout(1.second)) {
assert(listener.onGenerateBlockCalled)
assert(listener.onPushBlockCalled)
}
@ -100,7 +100,7 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter with TimeLim
listener.addedData.asScala.toSeq should contain theSameElementsInOrderAs (data2)
listener.addedMetadata.asScala.toSeq should contain theSameElementsInOrderAs (metadata2)
clock.advance(blockIntervalMs) // advance clock to generate blocks
eventually(timeout(1 second)) {
eventually(timeout(1.second)) {
val combined = data1 ++ data2
listener.pushedData.asScala.toSeq should contain theSameElementsInOrderAs combined
}
@ -112,7 +112,7 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter with TimeLim
val combinedMetadata = metadata2 :+ metadata3
listener.addedMetadata.asScala.toSeq should contain theSameElementsInOrderAs (combinedMetadata)
clock.advance(blockIntervalMs) // advance clock to generate blocks
eventually(timeout(1 second)) {
eventually(timeout(1.second)) {
val combinedData = data1 ++ data2 ++ data3
listener.pushedData.asScala.toSeq should contain theSameElementsInOrderAs (combinedData)
}
@ -120,7 +120,7 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter with TimeLim
// Stop the block generator by starting the stop on a different thread and
// then advancing the manual clock for the stopping to proceed.
val thread = stopBlockGenerator(blockGenerator)
eventually(timeout(1 second), interval(10 milliseconds)) {
eventually(timeout(1.second), interval(10.milliseconds)) {
clock.advance(blockIntervalMs)
assert(blockGenerator.isStopped())
}
@ -160,7 +160,7 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter with TimeLim
// - Finally, wait for all blocks to be pushed
clock.advance(1) // to make sure that the timer for another interval to complete
val thread = stopBlockGenerator(blockGenerator)
eventually(timeout(1 second), interval(10 milliseconds)) {
eventually(timeout(1.second), interval(10.milliseconds)) {
assert(blockGenerator.isActive() === false)
}
assert(blockGenerator.isStopped() === false)
@ -181,7 +181,7 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter with TimeLim
// (expected as stop() should never complete) or a SparkException (unexpected as stop()
// completed and thread terminated).
val exception = intercept[Exception] {
failAfter(200 milliseconds) {
failAfter(200.milliseconds) {
thread.join()
throw new SparkException(
"BlockGenerator.stop() completed before generating timer was stopped")
@ -193,7 +193,7 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter with TimeLim
// Verify that the final data is present in the final generated block and
// pushed before complete stop
assert(blockGenerator.isStopped() === false) // generator has not stopped yet
eventually(timeout(10 seconds), interval(10 milliseconds)) {
eventually(timeout(10.seconds), interval(10.milliseconds)) {
// Keep calling `advance` to avoid blocking forever in `clock.waitTillTime`
clock.advance(blockIntervalMs)
assert(thread.isAlive === false)
@ -213,7 +213,7 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter with TimeLim
blockGenerator.start()
assert(listener.onErrorCalled === false)
blockGenerator.addData(1)
eventually(timeout(1 second), interval(10 milliseconds)) {
eventually(timeout(1.second), interval(10.milliseconds)) {
assert(listener.onErrorCalled)
}
blockGenerator.stop()

View file

@ -61,7 +61,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite
val expectedWaitTime = clock.getTimeMillis() + advancedTime
clock.advance(advancedTime)
// Make sure ExecutorAllocationManager.manageAllocation is called
eventually(timeout(10 seconds)) {
eventually(timeout(10.seconds)) {
assert(clock.isStreamWaitingAt(expectedWaitTime))
}
body

View file

@ -20,7 +20,6 @@ package org.apache.spark.streaming.scheduler
import java.util.concurrent.CountDownLatch
import scala.concurrent.duration._
import scala.language.postfixOps
import org.scalatest.concurrent.Eventually._
@ -69,10 +68,10 @@ class JobGeneratorSuite extends TestSuiteBase {
val longBatchNumber = 3 // 3rd batch will take a long time
val longBatchTime = longBatchNumber * batchDuration.milliseconds
val testTimeout = timeout(10 seconds)
val testTimeout = timeout(10.seconds)
val inputStream = ssc.receiverStream(new TestReceiver)
inputStream.foreachRDD((rdd: RDD[Int], time: Time) => {
inputStream.foreachRDD((_: RDD[Int], time: Time) => {
if (time.milliseconds == longBatchTime) {
while (waitLatch.getCount() > 0) {
waitLatch.await()

View file

@ -41,7 +41,7 @@ class ReceiverTrackerSuite extends TestSuiteBase {
try {
// we wait until the Receiver has registered with the tracker,
// otherwise our rate update is lost
eventually(timeout(5 seconds)) {
eventually(timeout(5.seconds)) {
assert(RateTestReceiver.getActive().nonEmpty)
}
@ -49,7 +49,7 @@ class ReceiverTrackerSuite extends TestSuiteBase {
// Verify that the rate of the block generator in the receiver get updated
val activeReceiver = RateTestReceiver.getActive().get
tracker.sendRateUpdate(inputDStream.id, newRateLimit)
eventually(timeout(5 seconds)) {
eventually(timeout(5.seconds)) {
assert(activeReceiver.getDefaultBlockGeneratorRateLimit() === newRateLimit,
"default block generator did not receive rate update")
assert(activeReceiver.getCustomBlockGeneratorRateLimit() === newRateLimit,
@ -76,7 +76,7 @@ class ReceiverTrackerSuite extends TestSuiteBase {
output.register()
ssc.start()
StoppableReceiver.shouldStop = true
eventually(timeout(10 seconds), interval(10 millis)) {
eventually(timeout(10.seconds), interval(10.milliseconds)) {
// The receiver is stopped once, so if it's restarted, it should be started twice.
assert(startTimes === 2)
}
@ -98,7 +98,7 @@ class ReceiverTrackerSuite extends TestSuiteBase {
val output = new TestOutputStream(input)
output.register()
ssc.start()
eventually(timeout(10 seconds), interval(10 millis)) {
eventually(timeout(10.seconds), interval(10.milliseconds)) {
// If preferredLocations is set correctly, receiverTaskLocality should be PROCESS_LOCAL
assert(receiverTaskLocality === TaskLocality.PROCESS_LOCAL)
}

View file

@ -26,7 +26,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.concurrent._
import scala.concurrent.duration._
import scala.language.{implicitConversions, postfixOps}
import scala.language.implicitConversions
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
@ -135,7 +135,7 @@ abstract class CommonWriteAheadLogTests(
if (waitForCompletion) {
assert(getLogFilesInDirectory(testDir).size < logFiles.size)
} else {
eventually(Eventually.timeout(1 second), interval(10 milliseconds)) {
eventually(Eventually.timeout(1.second), interval(10.milliseconds)) {
assert(getLogFilesInDirectory(testDir).size < logFiles.size)
}
}
@ -504,7 +504,7 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests(
// The queue.take() immediately takes the 3, and there is nothing left in the queue at that
// moment. Then the promise blocks the writing of 3. The rest get queued.
writeAsync(batchedWal, event1, 3L)
eventually(timeout(1 second)) {
eventually(timeout(1.second)) {
assert(blockingWal.isBlocked)
assert(batchedWal.invokePrivate(queueLength()) === 0)
}
@ -514,12 +514,12 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests(
// we would like event 5 to be written before event 4 in order to test that they get
// sorted before being aggregated
writeAsync(batchedWal, event5, 12L)
eventually(timeout(1 second)) {
eventually(timeout(1.second)) {
assert(blockingWal.isBlocked)
assert(batchedWal.invokePrivate(queueLength()) === 3)
}
writeAsync(batchedWal, event4, 10L)
eventually(timeout(1 second)) {
eventually(timeout(1.second)) {
assert(walBatchingThreadPool.getActiveCount === 5)
assert(batchedWal.invokePrivate(queueLength()) === 4)
}
@ -528,7 +528,7 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests(
val buffer = wrapArrayArrayByte(Array(event1))
val queuedEvents = Set(event2, event3, event4, event5)
eventually(timeout(1 second)) {
eventually(timeout(1.second)) {
assert(batchedWal.invokePrivate(queueLength()) === 0)
verify(wal, times(1)).write(meq(buffer), meq(3L))
// the file name should be the timestamp of the last record, as events should be naturally
@ -559,7 +559,7 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests(
// The queue.take() immediately takes the 3, and there is nothing left in the queue at that
// moment. Then the promise blocks the writing of 3. The rest get queued.
val promise1 = writeAsync(batchedWal, event1, 3L)
eventually(timeout(1 second)) {
eventually(timeout(1.second)) {
assert(blockingWal.isBlocked)
assert(batchedWal.invokePrivate(queueLength()) === 0)
}
@ -567,7 +567,7 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests(
val promise2 = writeAsync(batchedWal, event2, 5L)
val promise3 = writeAsync(batchedWal, event3, 8L)
eventually(timeout(1 second)) {
eventually(timeout(1.second)) {
assert(walBatchingThreadPool.getActiveCount === 3)
assert(blockingWal.isBlocked)
assert(batchedWal.invokePrivate(queueLength()) === 2) // event1 is being written
@ -576,7 +576,7 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests(
val writePromises = Seq(promise1, promise2, promise3)
batchedWal.close()
eventually(timeout(1 second)) {
eventually(timeout(1.second)) {
assert(writePromises.forall(_.isCompleted))
assert(writePromises.forall(_.future.value.get.isFailure)) // all should have failed
}
@ -772,7 +772,7 @@ object WriteAheadLogSuite {
override def write(record: ByteBuffer, time: Long): WriteAheadLogRecordHandle = {
isWriteCalled = true
eventually(Eventually.timeout(2 second)) {
eventually(Eventually.timeout(2.second)) {
assert(!blockWrite)
}
wal.write(record, time)