From 4ec7f631aa23bc0121656dd77f05767f447112c0 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Thu, 11 Apr 2019 13:43:44 -0500 Subject: [PATCH] [SPARK-27404][CORE][SQL][STREAMING][YARN] Fix build warnings for 3.0: postfixOps edition ## What changes were proposed in this pull request? Fix build warnings -- see some details below. But mostly, remove use of postfix syntax where it causes warnings without the `scala.language.postfixOps` import. This is mostly in expressions like "120000 milliseconds". Which, I'd like to simplify to things like "2.minutes" anyway. ## How was this patch tested? Existing tests. Closes #24314 from srowen/SPARK-27404. Authored-by: Sean Owen Signed-off-by: Sean Owen --- .../shuffle/RetryingBlockFetcherSuite.java | 8 +-- .../org/apache/spark/BarrierTaskContext.scala | 3 +- .../spark/deploy/FaultToleranceTest.scala | 17 +++--- .../org/apache/spark/rpc/RpcTimeout.scala | 2 +- .../apache/spark/scheduler/DAGScheduler.scala | 3 +- .../scala/org/apache/spark/ui/UIUtils.scala | 19 +++--- .../spark/BarrierStageOnSubmittedSuite.scala | 3 +- .../apache/spark/ContextCleanerSuite.scala | 16 ++--- .../scala/org/apache/spark/DriverSuite.scala | 4 +- .../apache/spark/JobCancellationSuite.scala | 4 +- .../org/apache/spark/SparkConfSuite.scala | 5 +- .../org/apache/spark/StatusTrackerSuite.scala | 29 +++++----- .../spark/deploy/SparkSubmitSuite.scala | 2 +- .../StandaloneDynamicAllocationSuite.scala | 2 +- .../spark/deploy/client/AppClientSuite.scala | 2 +- .../history/FsHistoryProviderSuite.scala | 11 ++-- .../deploy/history/HistoryServerSuite.scala | 20 +------ .../spark/deploy/master/MasterSuite.scala | 11 ++-- .../spark/deploy/worker/WorkerSuite.scala | 6 +- .../apache/spark/executor/ExecutorSuite.scala | 1 - .../spark/launcher/LauncherBackendSuite.scala | 5 +- .../spark/rdd/AsyncRDDActionsSuite.scala | 4 +- .../spark/rdd/LocalCheckpointSuite.scala | 3 +- .../org/apache/spark/rpc/RpcEnvSuite.scala | 55 +++++++++--------- .../CoarseGrainedSchedulerBackendSuite.scala | 2 +- .../spark/scheduler/DAGSchedulerSuite.scala | 2 +- .../OutputCommitCoordinatorSuite.scala | 3 +- .../scheduler/SchedulerIntegrationSuite.scala | 2 +- .../scheduler/TaskResultGetterSuite.scala | 7 +-- .../BlockManagerReplicationSuite.scala | 15 +++-- .../spark/storage/BlockManagerSuite.scala | 22 +++---- .../org/apache/spark/ui/UISeleniumSuite.scala | 54 ++++++++--------- .../scala/org/apache/spark/ui/UISuite.scala | 4 +- .../apache/spark/util/EventLoopSuite.scala | 19 +++--- .../ExternalAppendOnlyMapSuite.scala | 3 +- .../sql/jdbc/DockerJDBCIntegrationSuite.scala | 2 +- .../KafkaDontFailOnDataLossSuite.scala | 2 +- .../spark/sql/kafka010/KafkaTestUtils.scala | 9 ++- .../kafka010/DirectKafkaStreamSuite.scala | 21 ++++--- .../kinesis/KinesisCheckpointerSuite.scala | 11 ++-- .../kinesis/KinesisReceiverSuite.scala | 6 +- .../kinesis/KinesisStreamSuite.scala | 42 ++++++++------ .../org/apache/spark/ml/MLEventsSuite.scala | 41 +++++++------ .../deploy/yarn/BaseYarnClusterSuite.scala | 4 +- .../spark/deploy/yarn/YarnClusterSuite.scala | 8 +-- .../yarn/YarnShuffleServiceSuite.scala | 5 +- .../sql/sources/v2/JavaSimpleBatchTable.java | 54 ----------------- .../sources/v2/JavaSimpleReaderFactory.java | 50 ++++++++++++++++ .../sql/sources/v2/JavaSimpleScanBuilder.java | 47 +++++++++++++++ .../apache/spark/sql/CachedTableSuite.scala | 11 ++-- .../apache/spark/sql/DatasetCacheSuite.scala | 4 +- .../sources/TextSocketStreamSuite.scala | 2 +- .../state/StateStoreCoordinatorSuite.scala | 8 +-- .../streaming/state/StateStoreSuite.scala | 2 +- .../StreamingQueryManagerSuite.scala | 58 +++++++++---------- ...StreamingQueryStatusAndProgressSuite.scala | 3 +- .../hive/thriftserver/UISeleniumSuite.scala | 4 +- .../receiver/ReceivedBlockHandler.scala | 2 +- .../util/FileBasedWriteAheadLog.scala | 5 +- .../spark/streaming/CheckpointSuite.scala | 4 +- .../streaming/ReceivedBlockHandlerSuite.scala | 3 +- .../streaming/ReceivedBlockTrackerSuite.scala | 4 +- .../spark/streaming/ReceiverSuite.scala | 22 +++---- .../streaming/StreamingContextSuite.scala | 52 ++++++++--------- .../streaming/StreamingListenerSuite.scala | 4 +- .../spark/streaming/UISeleniumSuite.scala | 8 +-- .../receiver/BlockGeneratorSuite.scala | 16 ++--- .../ExecutorAllocationManagerSuite.scala | 2 +- .../scheduler/JobGeneratorSuite.scala | 5 +- .../scheduler/ReceiverTrackerSuite.scala | 8 +-- .../streaming/util/WriteAheadLogSuite.scala | 20 +++---- 71 files changed, 458 insertions(+), 459 deletions(-) create mode 100644 sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleReaderFactory.java create mode 100644 sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleScanBuilder.java diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java index a530e16734..6f90df5f61 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java @@ -46,9 +46,9 @@ import static org.apache.spark.network.shuffle.RetryingBlockFetcher.BlockFetchSt */ public class RetryingBlockFetcherSuite { - ManagedBuffer block0 = new NioManagedBuffer(ByteBuffer.wrap(new byte[13])); - ManagedBuffer block1 = new NioManagedBuffer(ByteBuffer.wrap(new byte[7])); - ManagedBuffer block2 = new NioManagedBuffer(ByteBuffer.wrap(new byte[19])); + private final ManagedBuffer block0 = new NioManagedBuffer(ByteBuffer.wrap(new byte[13])); + private final ManagedBuffer block1 = new NioManagedBuffer(ByteBuffer.wrap(new byte[7])); + private final ManagedBuffer block2 = new NioManagedBuffer(ByteBuffer.wrap(new byte[19])); @Test public void testNoFailures() throws IOException, InterruptedException { @@ -291,7 +291,7 @@ public class RetryingBlockFetcherSuite { } assertNotNull(stub); - stub.when(fetchStarter).createAndStart(any(), anyObject()); + stub.when(fetchStarter).createAndStart(any(), any()); String[] blockIdArray = blockIds.toArray(new String[blockIds.size()]); new RetryingBlockFetcher(conf, fetchStarter, blockIdArray, listener).start(); } diff --git a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala index 2d842b98ea..a354f44a1b 100644 --- a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala +++ b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala @@ -20,7 +20,6 @@ package org.apache.spark import java.util.{Properties, Timer, TimerTask} import scala.concurrent.duration._ -import scala.language.postfixOps import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.executor.TaskMetrics @@ -122,7 +121,7 @@ class BarrierTaskContext private[spark] ( barrierEpoch), // Set a fixed timeout for RPC here, so users shall get a SparkException thrown by // BarrierCoordinator on timeout, instead of RPCTimeoutException from the RPC framework. - timeout = new RpcTimeout(31536000 /* = 3600 * 24 * 365 */ seconds, "barrierTimeout")) + timeout = new RpcTimeout(365.days, "barrierTimeout")) barrierEpoch += 1 logInfo(s"Task $taskAttemptId from Stage $stageId(Attempt $stageAttemptNumber) finished " + "global sync successfully, waited for " + diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala index a662430120..99f8412340 100644 --- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala +++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala @@ -26,7 +26,6 @@ import scala.collection.mutable.ListBuffer import scala.concurrent.{Future, Promise} import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ -import scala.language.postfixOps import scala.sys.process._ import org.json4s._ @@ -112,7 +111,7 @@ private object FaultToleranceTest extends App with Logging { assertValidClusterState() killLeader() - delay(30 seconds) + delay(30.seconds) assertValidClusterState() createClient() assertValidClusterState() @@ -126,12 +125,12 @@ private object FaultToleranceTest extends App with Logging { killLeader() addMasters(1) - delay(30 seconds) + delay(30.seconds) assertValidClusterState() killLeader() addMasters(1) - delay(30 seconds) + delay(30.seconds) assertValidClusterState() } @@ -156,7 +155,7 @@ private object FaultToleranceTest extends App with Logging { killLeader() workers.foreach(_.kill()) workers.clear() - delay(30 seconds) + delay(30.seconds) addWorkers(2) assertValidClusterState() } @@ -174,7 +173,7 @@ private object FaultToleranceTest extends App with Logging { (1 to 3).foreach { _ => killLeader() - delay(30 seconds) + delay(30.seconds) assertValidClusterState() assertTrue(getLeader == masters.head) addMasters(1) @@ -264,7 +263,7 @@ private object FaultToleranceTest extends App with Logging { } // Avoid waiting indefinitely (e.g., we could register but get no executors). - assertTrue(ThreadUtils.awaitResult(f, 120 seconds)) + assertTrue(ThreadUtils.awaitResult(f, 2.minutes)) } /** @@ -317,7 +316,7 @@ private object FaultToleranceTest extends App with Logging { } try { - assertTrue(ThreadUtils.awaitResult(f, 120 seconds)) + assertTrue(ThreadUtils.awaitResult(f, 2.minutes)) } catch { case e: TimeoutException => logError("Master states: " + masters.map(_.state)) @@ -421,7 +420,7 @@ private object SparkDocker { } dockerCmd.run(ProcessLogger(findIpAndLog _)) - val ip = ThreadUtils.awaitResult(ipPromise.future, 30 seconds) + val ip = ThreadUtils.awaitResult(ipPromise.future, 30.seconds) val dockerId = Docker.getLastProcessId (ip, dockerId, outFile) } diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala b/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala index 3dc41f7f12..770ae2f1dd 100644 --- a/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala +++ b/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala @@ -52,7 +52,7 @@ private[spark] class RpcTimeout(val duration: FiniteDuration, val timeoutProp: S * * @note This can be used in the recover callback of a Future to add to a TimeoutException * Example: - * val timeout = new RpcTimeout(5 millis, "short timeout") + * val timeout = new RpcTimeout(5.milliseconds, "short timeout") * Future(throw new TimeoutException).recover(timeout.addMessageIfTimeout) */ def addMessageIfTimeout[T]: PartialFunction[Throwable, T] = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index d967d38c52..524b0c4f6c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -27,7 +27,6 @@ import scala.collection.Map import scala.collection.mutable.{ArrayStack, HashMap, HashSet} import scala.concurrent.duration._ import scala.language.existentials -import scala.language.postfixOps import scala.util.control.NonFatal import org.apache.commons.lang3.SerializationUtils @@ -270,7 +269,7 @@ private[spark] class DAGScheduler( listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, accumUpdates, Some(executorUpdates))) blockManagerMaster.driverEndpoint.askSync[Boolean]( - BlockManagerHeartbeat(blockManagerId), new RpcTimeout(600 seconds, "BlockManagerHeartbeat")) + BlockManagerHeartbeat(blockManagerId), new RpcTimeout(10.minutes, "BlockManagerHeartbeat")) } /** diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index d7bda8b80c..11647c0c7c 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -109,12 +109,12 @@ private[spark] object UIUtils extends Logging { } } // if time is more than a year - return s"$yearString $weekString $dayString" + s"$yearString $weekString $dayString" } catch { case e: Exception => logError("Error converting time to string", e) // if there is some error, return blank string - return "" + "" } } @@ -336,7 +336,7 @@ private[spark] object UIUtils extends Logging { def getHeaderContent(header: String): Seq[Node] = { if (newlinesInHeader) {
    - { header.split("\n").map { case t =>
  • {t}
  • } } + { header.split("\n").map(t =>
  • {t}
  • ) }
} else { Text(header) @@ -446,7 +446,7 @@ private[spark] object UIUtils extends Logging { * the whole string will rendered as a simple escaped text. * * Note: In terms of security, only anchor tags with root relative links are supported. So any - * attempts to embed links outside Spark UI, or other tags like {@code