[SPARK-3850] Trim trailing spaces for core.
Author: Reynold Xin <rxin@databricks.com>
Closes #6533 from rxin/whitespace-2 and squashes the following commits:
038314c [Reynold Xin] [SPARK-3850] Trim trailing spaces for core.
(cherry picked from commit 74fdc97c72
)
Signed-off-by: Reynold Xin <rxin@databricks.com>
Conflicts:
core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
This commit is contained in:
parent
2016927f70
commit
a7c217166b
|
@ -34,8 +34,8 @@ case class Aggregator[K, V, C] (
|
|||
mergeValue: (C, V) => C,
|
||||
mergeCombiners: (C, C) => C) {
|
||||
|
||||
// When spilling is enabled sorting will happen externally, but not necessarily with an
|
||||
// ExternalSorter.
|
||||
// When spilling is enabled sorting will happen externally, but not necessarily with an
|
||||
// ExternalSorter.
|
||||
private val isSpillEnabled = SparkEnv.get.conf.getBoolean("spark.shuffle.spill", true)
|
||||
|
||||
@deprecated("use combineValuesByKey with TaskContext argument", "0.9.0")
|
||||
|
|
|
@ -150,7 +150,7 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
|
|||
}
|
||||
|
||||
override def isCompleted: Boolean = jobWaiter.jobFinished
|
||||
|
||||
|
||||
override def isCancelled: Boolean = _cancelled
|
||||
|
||||
override def value: Option[Try[T]] = {
|
||||
|
|
|
@ -29,7 +29,7 @@ import org.apache.spark.util.{ThreadUtils, Utils}
|
|||
|
||||
/**
|
||||
* A heartbeat from executors to the driver. This is a shared message used by several internal
|
||||
* components to convey liveness or execution information for in-progress tasks. It will also
|
||||
* components to convey liveness or execution information for in-progress tasks. It will also
|
||||
* expire the hosts that have not heartbeated for more than spark.network.timeout.
|
||||
*/
|
||||
private[spark] case class Heartbeat(
|
||||
|
@ -43,8 +43,8 @@ private[spark] case class Heartbeat(
|
|||
*/
|
||||
private[spark] case object TaskSchedulerIsSet
|
||||
|
||||
private[spark] case object ExpireDeadHosts
|
||||
|
||||
private[spark] case object ExpireDeadHosts
|
||||
|
||||
private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)
|
||||
|
||||
/**
|
||||
|
@ -62,18 +62,18 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)
|
|||
|
||||
// "spark.network.timeout" uses "seconds", while `spark.storage.blockManagerSlaveTimeoutMs` uses
|
||||
// "milliseconds"
|
||||
private val slaveTimeoutMs =
|
||||
private val slaveTimeoutMs =
|
||||
sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs", "120s")
|
||||
private val executorTimeoutMs =
|
||||
private val executorTimeoutMs =
|
||||
sc.conf.getTimeAsSeconds("spark.network.timeout", s"${slaveTimeoutMs}ms") * 1000
|
||||
|
||||
|
||||
// "spark.network.timeoutInterval" uses "seconds", while
|
||||
// "spark.storage.blockManagerTimeoutIntervalMs" uses "milliseconds"
|
||||
private val timeoutIntervalMs =
|
||||
private val timeoutIntervalMs =
|
||||
sc.conf.getTimeAsMs("spark.storage.blockManagerTimeoutIntervalMs", "60s")
|
||||
private val checkTimeoutIntervalMs =
|
||||
private val checkTimeoutIntervalMs =
|
||||
sc.conf.getTimeAsSeconds("spark.network.timeoutInterval", s"${timeoutIntervalMs}ms") * 1000
|
||||
|
||||
|
||||
private var timeoutCheckingTask: ScheduledFuture[_] = null
|
||||
|
||||
// "eventLoopThread" is used to run some pretty fast actions. The actions running in it should not
|
||||
|
@ -140,7 +140,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
override def onStop(): Unit = {
|
||||
if (timeoutCheckingTask != null) {
|
||||
timeoutCheckingTask.cancel(true)
|
||||
|
|
|
@ -50,8 +50,8 @@ private[spark] class HttpFileServer(
|
|||
|
||||
def stop() {
|
||||
httpServer.stop()
|
||||
|
||||
// If we only stop sc, but the driver process still run as a services then we need to delete
|
||||
|
||||
// If we only stop sc, but the driver process still run as a services then we need to delete
|
||||
// the tmp dir, if not, it will create too many tmp dirs
|
||||
try {
|
||||
Utils.deleteRecursively(baseDir)
|
||||
|
|
|
@ -227,7 +227,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
|
|||
def getSizeAsBytes(key: String, defaultValue: String): Long = {
|
||||
Utils.byteStringAsBytes(get(key, defaultValue))
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Get a size parameter as Kibibytes; throws a NoSuchElementException if it's not set. If no
|
||||
* suffix is provided then Kibibytes are assumed.
|
||||
|
@ -244,7 +244,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
|
|||
def getSizeAsKb(key: String, defaultValue: String): Long = {
|
||||
Utils.byteStringAsKb(get(key, defaultValue))
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Get a size parameter as Mebibytes; throws a NoSuchElementException if it's not set. If no
|
||||
* suffix is provided then Mebibytes are assumed.
|
||||
|
@ -261,7 +261,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
|
|||
def getSizeAsMb(key: String, defaultValue: String): Long = {
|
||||
Utils.byteStringAsMb(get(key, defaultValue))
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Get a size parameter as Gibibytes; throws a NoSuchElementException if it's not set. If no
|
||||
* suffix is provided then Gibibytes are assumed.
|
||||
|
@ -278,7 +278,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
|
|||
def getSizeAsGb(key: String, defaultValue: String): Long = {
|
||||
Utils.byteStringAsGb(get(key, defaultValue))
|
||||
}
|
||||
|
||||
|
||||
/** Get a parameter as an Option */
|
||||
def getOption(key: String): Option[String] = {
|
||||
Option(settings.get(key)).orElse(getDeprecatedConfig(key, this))
|
||||
|
@ -480,7 +480,7 @@ private[spark] object SparkConf extends Logging {
|
|||
"spark.kryoserializer.buffer.mb was previously specified as '0.064'. Fractional values " +
|
||||
"are no longer accepted. To specify the equivalent now, one may use '64k'.")
|
||||
)
|
||||
|
||||
|
||||
Map(configs.map { cfg => (cfg.key -> cfg) } : _*)
|
||||
}
|
||||
|
||||
|
@ -508,7 +508,7 @@ private[spark] object SparkConf extends Logging {
|
|||
"spark.reducer.maxSizeInFlight" -> Seq(
|
||||
AlternateConfig("spark.reducer.maxMbInFlight", "1.4")),
|
||||
"spark.kryoserializer.buffer" ->
|
||||
Seq(AlternateConfig("spark.kryoserializer.buffer.mb", "1.4",
|
||||
Seq(AlternateConfig("spark.kryoserializer.buffer.mb", "1.4",
|
||||
translation = s => s"${(s.toDouble * 1000).toInt}k")),
|
||||
"spark.kryoserializer.buffer.max" -> Seq(
|
||||
AlternateConfig("spark.kryoserializer.buffer.max.mb", "1.4")),
|
||||
|
|
|
@ -51,7 +51,7 @@ private[spark] object TestUtils {
|
|||
classpathUrls: Seq[URL] = Seq()): URL = {
|
||||
val tempDir = Utils.createTempDir()
|
||||
val files1 = for (name <- classNames) yield {
|
||||
createCompiledClass(name, tempDir, toStringValue, classpathUrls = classpathUrls)
|
||||
createCompiledClass(name, tempDir, toStringValue, classpathUrls = classpathUrls)
|
||||
}
|
||||
val files2 = for ((childName, baseName) <- classNamesWithBase) yield {
|
||||
createCompiledClass(childName, tempDir, toStringValue, baseName, classpathUrls)
|
||||
|
|
|
@ -137,7 +137,7 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double])
|
|||
*/
|
||||
def sample(withReplacement: Boolean, fraction: JDouble): JavaDoubleRDD =
|
||||
sample(withReplacement, fraction, Utils.random.nextLong)
|
||||
|
||||
|
||||
/**
|
||||
* Return a sampled subset of this RDD.
|
||||
*/
|
||||
|
|
|
@ -101,7 +101,7 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
|
|||
|
||||
/**
|
||||
* Return a sampled subset of this RDD.
|
||||
*
|
||||
*
|
||||
* @param withReplacement can elements be sampled multiple times (replaced when sampled out)
|
||||
* @param fraction expected size of the sample as a fraction of this RDD's size
|
||||
* without replacement: probability that each element is chosen; fraction must be [0, 1]
|
||||
|
@ -109,10 +109,10 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
|
|||
*/
|
||||
def sample(withReplacement: Boolean, fraction: Double): JavaRDD[T] =
|
||||
sample(withReplacement, fraction, Utils.random.nextLong)
|
||||
|
||||
|
||||
/**
|
||||
* Return a sampled subset of this RDD.
|
||||
*
|
||||
*
|
||||
* @param withReplacement can elements be sampled multiple times (replaced when sampled out)
|
||||
* @param fraction expected size of the sample as a fraction of this RDD's size
|
||||
* without replacement: probability that each element is chosen; fraction must be [0, 1]
|
||||
|
|
|
@ -797,10 +797,10 @@ private class PythonAccumulatorParam(@transient serverHost: String, serverPort:
|
|||
|
||||
val bufferSize = SparkEnv.get.conf.getInt("spark.buffer.size", 65536)
|
||||
|
||||
/**
|
||||
/**
|
||||
* We try to reuse a single Socket to transfer accumulator updates, as they are all added
|
||||
* by the DAGScheduler's single-threaded actor anyway.
|
||||
*/
|
||||
*/
|
||||
@transient var socket: Socket = _
|
||||
|
||||
def openSocket(): Socket = synchronized {
|
||||
|
|
|
@ -44,11 +44,11 @@ private[spark] class RBackend {
|
|||
bossGroup = new NioEventLoopGroup(2)
|
||||
val workerGroup = bossGroup
|
||||
val handler = new RBackendHandler(this)
|
||||
|
||||
|
||||
bootstrap = new ServerBootstrap()
|
||||
.group(bossGroup, workerGroup)
|
||||
.channel(classOf[NioServerSocketChannel])
|
||||
|
||||
|
||||
bootstrap.childHandler(new ChannelInitializer[SocketChannel]() {
|
||||
def initChannel(ch: SocketChannel): Unit = {
|
||||
ch.pipeline()
|
||||
|
|
|
@ -77,7 +77,7 @@ private[r] class RBackendHandler(server: RBackend)
|
|||
val reply = bos.toByteArray
|
||||
ctx.write(reply)
|
||||
}
|
||||
|
||||
|
||||
override def channelReadComplete(ctx: ChannelHandlerContext): Unit = {
|
||||
ctx.flush()
|
||||
}
|
||||
|
|
|
@ -869,7 +869,7 @@ private[spark] object SparkSubmitUtils {
|
|||
md.addDependency(dd)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/** Add exclusion rules for dependencies already included in the spark-assembly */
|
||||
def addExclusionRules(
|
||||
ivySettings: IvySettings,
|
||||
|
|
|
@ -23,7 +23,7 @@ import org.apache.spark.util.Utils
|
|||
/**
|
||||
* Command-line parser for the master.
|
||||
*/
|
||||
private[history] class HistoryServerArguments(conf: SparkConf, args: Array[String])
|
||||
private[history] class HistoryServerArguments(conf: SparkConf, args: Array[String])
|
||||
extends Logging {
|
||||
private var propertiesFile: String = null
|
||||
|
||||
|
|
|
@ -32,7 +32,7 @@ import org.apache.spark.deploy.SparkCuratorUtil
|
|||
private[master] class ZooKeeperPersistenceEngine(conf: SparkConf, val serialization: Serialization)
|
||||
extends PersistenceEngine
|
||||
with Logging {
|
||||
|
||||
|
||||
private val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/master_status"
|
||||
private val zk: CuratorFramework = SparkCuratorUtil.newClient(conf)
|
||||
|
||||
|
|
|
@ -43,22 +43,22 @@ class TaskMetrics extends Serializable {
|
|||
private var _hostname: String = _
|
||||
def hostname: String = _hostname
|
||||
private[spark] def setHostname(value: String) = _hostname = value
|
||||
|
||||
|
||||
/**
|
||||
* Time taken on the executor to deserialize this task
|
||||
*/
|
||||
private var _executorDeserializeTime: Long = _
|
||||
def executorDeserializeTime: Long = _executorDeserializeTime
|
||||
private[spark] def setExecutorDeserializeTime(value: Long) = _executorDeserializeTime = value
|
||||
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Time the executor spends actually running the task (including fetching shuffle data)
|
||||
*/
|
||||
private var _executorRunTime: Long = _
|
||||
def executorRunTime: Long = _executorRunTime
|
||||
private[spark] def setExecutorRunTime(value: Long) = _executorRunTime = value
|
||||
|
||||
|
||||
/**
|
||||
* The number of bytes this task transmitted back to the driver as the TaskResult
|
||||
*/
|
||||
|
@ -315,7 +315,7 @@ class ShuffleReadMetrics extends Serializable {
|
|||
def remoteBlocksFetched: Int = _remoteBlocksFetched
|
||||
private[spark] def incRemoteBlocksFetched(value: Int) = _remoteBlocksFetched += value
|
||||
private[spark] def decRemoteBlocksFetched(value: Int) = _remoteBlocksFetched -= value
|
||||
|
||||
|
||||
/**
|
||||
* Number of local blocks fetched in this shuffle by this task
|
||||
*/
|
||||
|
@ -333,7 +333,7 @@ class ShuffleReadMetrics extends Serializable {
|
|||
def fetchWaitTime: Long = _fetchWaitTime
|
||||
private[spark] def incFetchWaitTime(value: Long) = _fetchWaitTime += value
|
||||
private[spark] def decFetchWaitTime(value: Long) = _fetchWaitTime -= value
|
||||
|
||||
|
||||
/**
|
||||
* Total number of remote bytes read from the shuffle by this task
|
||||
*/
|
||||
|
@ -381,7 +381,7 @@ class ShuffleWriteMetrics extends Serializable {
|
|||
def shuffleBytesWritten: Long = _shuffleBytesWritten
|
||||
private[spark] def incShuffleBytesWritten(value: Long) = _shuffleBytesWritten += value
|
||||
private[spark] def decShuffleBytesWritten(value: Long) = _shuffleBytesWritten -= value
|
||||
|
||||
|
||||
/**
|
||||
* Time the task spent blocking on writes to disk or buffer cache, in nanoseconds
|
||||
*/
|
||||
|
@ -389,7 +389,7 @@ class ShuffleWriteMetrics extends Serializable {
|
|||
def shuffleWriteTime: Long = _shuffleWriteTime
|
||||
private[spark] def incShuffleWriteTime(value: Long) = _shuffleWriteTime += value
|
||||
private[spark] def decShuffleWriteTime(value: Long) = _shuffleWriteTime -= value
|
||||
|
||||
|
||||
/**
|
||||
* Total number of records written to the shuffle by this task
|
||||
*/
|
||||
|
|
|
@ -26,9 +26,9 @@ import org.apache.spark.SecurityManager
|
|||
import org.apache.spark.metrics.MetricsSystem
|
||||
|
||||
private[spark] class Slf4jSink(
|
||||
val property: Properties,
|
||||
val property: Properties,
|
||||
val registry: MetricRegistry,
|
||||
securityMgr: SecurityManager)
|
||||
securityMgr: SecurityManager)
|
||||
extends Sink {
|
||||
val SLF4J_DEFAULT_PERIOD = 10
|
||||
val SLF4J_DEFAULT_UNIT = "SECONDS"
|
||||
|
|
|
@ -20,4 +20,4 @@ package org.apache.spark.metrics
|
|||
/**
|
||||
* Sinks used in Spark's metrics system.
|
||||
*/
|
||||
package object sink
|
||||
package object sink
|
||||
|
|
|
@ -85,9 +85,9 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
|
|||
numPartsToTry = partsScanned * 4
|
||||
} else {
|
||||
// the left side of max is >=1 whenever partsScanned >= 2
|
||||
numPartsToTry = Math.max(1,
|
||||
numPartsToTry = Math.max(1,
|
||||
(1.5 * num * partsScanned / results.size).toInt - partsScanned)
|
||||
numPartsToTry = Math.min(numPartsToTry, partsScanned * 4)
|
||||
numPartsToTry = Math.min(numPartsToTry, partsScanned * 4)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -196,7 +196,7 @@ class NewHadoopRDD[K, V](
|
|||
override def getPreferredLocations(hsplit: Partition): Seq[String] = {
|
||||
val split = hsplit.asInstanceOf[NewHadoopPartition].serializableHadoopSplit.value
|
||||
val locs = HadoopRDD.SPLIT_INFO_REFLECTIONS match {
|
||||
case Some(c) =>
|
||||
case Some(c) =>
|
||||
try {
|
||||
val infos = c.newGetLocationInfo.invoke(split).asInstanceOf[Array[AnyRef]]
|
||||
Some(HadoopRDD.convertSplitLocationInfo(infos))
|
||||
|
|
|
@ -328,7 +328,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
|
|||
reduceByKeyLocally(func)
|
||||
}
|
||||
|
||||
/**
|
||||
/**
|
||||
* Count the number of elements for each key, collecting the results to a local Map.
|
||||
*
|
||||
* Note that this method should only be used if the resulting map is expected to be small, as
|
||||
|
|
|
@ -41,7 +41,7 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
|
|||
*
|
||||
* @param logData Stream containing event log data.
|
||||
* @param sourceName Filename (or other source identifier) from whence @logData is being read
|
||||
* @param maybeTruncated Indicate whether log file might be truncated (some abnormal situations
|
||||
* @param maybeTruncated Indicate whether log file might be truncated (some abnormal situations
|
||||
* encountered, log file might not finished writing) or not
|
||||
*/
|
||||
def replay(
|
||||
|
@ -62,7 +62,7 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
|
|||
if (!maybeTruncated || lines.hasNext) {
|
||||
throw jpe
|
||||
} else {
|
||||
logWarning(s"Got JsonParseException from log file $sourceName" +
|
||||
logWarning(s"Got JsonParseException from log file $sourceName" +
|
||||
s" at line $lineNumber, the file might not have finished writing cleanly.")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -125,7 +125,7 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex
|
|||
if (interruptThread && taskThread != null) {
|
||||
taskThread.interrupt()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -861,9 +861,9 @@ private[spark] class TaskSetManager(
|
|||
case TaskLocality.RACK_LOCAL => "spark.locality.wait.rack"
|
||||
case _ => null
|
||||
}
|
||||
|
||||
|
||||
if (localityWaitKey != null) {
|
||||
conf.getTimeAsMs(localityWaitKey, defaultWait)
|
||||
conf.getTimeAsMs(localityWaitKey, defaultWait)
|
||||
} else {
|
||||
0L
|
||||
}
|
||||
|
|
|
@ -79,7 +79,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
|
|||
override def onStart() {
|
||||
// Periodically revive offers to allow delay scheduling to work
|
||||
val reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "1s")
|
||||
|
||||
|
||||
reviveThread.scheduleAtFixedRate(new Runnable {
|
||||
override def run(): Unit = Utils.tryLogNonFatalError {
|
||||
Option(self).foreach(_.send(ReviveOffers))
|
||||
|
|
|
@ -37,14 +37,14 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
|
|||
.newBuilder()
|
||||
.setMode(Volume.Mode.RW)
|
||||
spec match {
|
||||
case Array(container_path) =>
|
||||
case Array(container_path) =>
|
||||
Some(vol.setContainerPath(container_path))
|
||||
case Array(container_path, "rw") =>
|
||||
Some(vol.setContainerPath(container_path))
|
||||
case Array(container_path, "ro") =>
|
||||
Some(vol.setContainerPath(container_path)
|
||||
.setMode(Volume.Mode.RO))
|
||||
case Array(host_path, container_path) =>
|
||||
case Array(host_path, container_path) =>
|
||||
Some(vol.setContainerPath(container_path)
|
||||
.setHostPath(host_path))
|
||||
case Array(host_path, container_path, "rw") =>
|
||||
|
|
|
@ -52,7 +52,7 @@ class KryoSerializer(conf: SparkConf)
|
|||
with Serializable {
|
||||
|
||||
private val bufferSizeKb = conf.getSizeAsKb("spark.kryoserializer.buffer", "64k")
|
||||
|
||||
|
||||
if (bufferSizeKb >= ByteUnit.GiB.toKiB(2)) {
|
||||
throw new IllegalArgumentException("spark.kryoserializer.buffer must be less than " +
|
||||
s"2048 mb, got: + ${ByteUnit.KiB.toMiB(bufferSizeKb)} mb.")
|
||||
|
|
|
@ -80,7 +80,7 @@ private[hash] object BlockStoreShuffleFetcher extends Logging {
|
|||
blocksByAddress,
|
||||
serializer,
|
||||
// Note: we use getSizeAsMb when no suffix is provided for backwards compatibility
|
||||
SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024)
|
||||
SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024)
|
||||
val itr = blockFetcherItr.flatMap(unpackBlock)
|
||||
|
||||
val completionIter = CompletionIterator[T, Iterator[T]](itr, {
|
||||
|
|
|
@ -83,7 +83,7 @@ private[v1] class OneStageResource(ui: SparkUI) {
|
|||
withStageAttempt(stageId, stageAttemptId) { stage =>
|
||||
val tasks = stage.ui.taskData.values.map{AllStagesResource.convertTaskData}.toIndexedSeq
|
||||
.sorted(OneStageResource.ordering(sortBy))
|
||||
tasks.slice(offset, offset + length)
|
||||
tasks.slice(offset, offset + length)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -292,16 +292,16 @@ class BlockManagerMasterEndpoint(
|
|||
blockManagerIdByExecutor.get(id.executorId) match {
|
||||
case Some(oldId) =>
|
||||
// A block manager of the same executor already exists, so remove it (assumed dead)
|
||||
logError("Got two different block manager registrations on same executor - "
|
||||
logError("Got two different block manager registrations on same executor - "
|
||||
+ s" will replace old one $oldId with new one $id")
|
||||
removeExecutor(id.executorId)
|
||||
removeExecutor(id.executorId)
|
||||
case None =>
|
||||
}
|
||||
logInfo("Registering block manager %s with %s RAM, %s".format(
|
||||
id.hostPort, Utils.bytesToString(maxMemSize), id))
|
||||
|
||||
|
||||
blockManagerIdByExecutor(id.executorId) = id
|
||||
|
||||
|
||||
blockManagerInfo(id) = new BlockManagerInfo(
|
||||
id, System.currentTimeMillis(), maxMemSize, slaveEndpoint)
|
||||
}
|
||||
|
|
|
@ -151,7 +151,7 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
|
|||
try {
|
||||
Utils.removeShutdownHook(shutdownHook)
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
case e: Exception =>
|
||||
logError(s"Exception while removing shutdown hook.", e)
|
||||
}
|
||||
doStop()
|
||||
|
|
|
@ -62,12 +62,12 @@ private[spark] abstract class WebUI(
|
|||
tab.pages.foreach(attachPage)
|
||||
tabs += tab
|
||||
}
|
||||
|
||||
|
||||
def detachTab(tab: WebUITab) {
|
||||
tab.pages.foreach(detachPage)
|
||||
tabs -= tab
|
||||
}
|
||||
|
||||
|
||||
def detachPage(page: WebUIPage) {
|
||||
pageToHandlers.remove(page).foreach(_.foreach(detachHandler))
|
||||
}
|
||||
|
|
|
@ -119,7 +119,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
|
|||
"failedStages" -> failedStages.size
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
// These collections may grow arbitrarily, but once Spark becomes idle they should shrink back to
|
||||
// some bound based on the `spark.ui.retainedStages` and `spark.ui.retainedJobs` settings:
|
||||
private[spark] def getSizesOfSoftSizeLimitedCollections: Map[String, Int] = {
|
||||
|
|
|
@ -40,7 +40,7 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri
|
|||
self =>
|
||||
|
||||
private var sparkContext: SparkContext = null
|
||||
|
||||
|
||||
/* Cap the capacity of the event queue so we get an explicit error (rather than
|
||||
* an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
|
||||
private val EVENT_QUEUE_CAPACITY = 10000
|
||||
|
|
|
@ -230,7 +230,7 @@ object SizeEstimator extends Logging {
|
|||
val s1 = sampleArray(array, state, rand, drawn, length)
|
||||
val s2 = sampleArray(array, state, rand, drawn, length)
|
||||
val size = math.min(s1, s2)
|
||||
state.size += math.max(s1, s2) +
|
||||
state.size += math.max(s1, s2) +
|
||||
(size * ((length - ARRAY_SAMPLE_SIZE) / (ARRAY_SAMPLE_SIZE))).toLong
|
||||
}
|
||||
}
|
||||
|
@ -238,7 +238,7 @@ object SizeEstimator extends Logging {
|
|||
|
||||
private def sampleArray(
|
||||
array: AnyRef,
|
||||
state: SearchState,
|
||||
state: SearchState,
|
||||
rand: Random,
|
||||
drawn: OpenHashSet[Int],
|
||||
length: Int): Long = {
|
||||
|
|
|
@ -89,9 +89,9 @@ class ExternalAppendOnlyMap[K, V, C](
|
|||
|
||||
// Number of bytes spilled in total
|
||||
private var _diskBytesSpilled = 0L
|
||||
|
||||
|
||||
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
|
||||
private val fileBufferSize =
|
||||
private val fileBufferSize =
|
||||
sparkConf.getSizeAsKb("spark.shuffle.file.buffer", "32k").toInt * 1024
|
||||
|
||||
// Write metrics for current spill
|
||||
|
|
|
@ -109,7 +109,7 @@ private[spark] class ExternalSorter[K, V, C](
|
|||
|
||||
private val conf = SparkEnv.get.conf
|
||||
private val spillingEnabled = conf.getBoolean("spark.shuffle.spill", true)
|
||||
|
||||
|
||||
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
|
||||
private val fileBufferSize = conf.getSizeAsKb("spark.shuffle.file.buffer", "32k").toInt * 1024
|
||||
private val transferToEnabled = conf.getBoolean("spark.file.transferTo", true)
|
||||
|
|
|
@ -119,7 +119,7 @@ class FailureSuite extends FunSuite with LocalSparkContext {
|
|||
sc.parallelize(1 to 10, 2).map(x => a).count()
|
||||
}
|
||||
assert(thrown.getClass === classOf[SparkException])
|
||||
assert(thrown.getMessage.contains("NotSerializableException") ||
|
||||
assert(thrown.getMessage.contains("NotSerializableException") ||
|
||||
thrown.getCause.getClass === classOf[NotSerializableException])
|
||||
|
||||
// Non-serializable closure in an earlier stage
|
||||
|
@ -127,7 +127,7 @@ class FailureSuite extends FunSuite with LocalSparkContext {
|
|||
sc.parallelize(1 to 10, 2).map(x => (x, a)).partitionBy(new HashPartitioner(3)).count()
|
||||
}
|
||||
assert(thrown1.getClass === classOf[SparkException])
|
||||
assert(thrown1.getMessage.contains("NotSerializableException") ||
|
||||
assert(thrown1.getMessage.contains("NotSerializableException") ||
|
||||
thrown1.getCause.getClass === classOf[NotSerializableException])
|
||||
|
||||
// Non-serializable closure in foreach function
|
||||
|
@ -135,7 +135,7 @@ class FailureSuite extends FunSuite with LocalSparkContext {
|
|||
sc.parallelize(1 to 10, 2).foreach(x => println(a))
|
||||
}
|
||||
assert(thrown2.getClass === classOf[SparkException])
|
||||
assert(thrown2.getMessage.contains("NotSerializableException") ||
|
||||
assert(thrown2.getMessage.contains("NotSerializableException") ||
|
||||
thrown2.getCause.getClass === classOf[NotSerializableException])
|
||||
|
||||
FailureSuiteState.clear()
|
||||
|
|
|
@ -29,11 +29,11 @@ class ImplicitOrderingSuite extends FunSuite with LocalSparkContext {
|
|||
|
||||
// These RDD methods are in the companion object so that the unserializable ScalaTest Engine
|
||||
// won't be reachable from the closure object
|
||||
|
||||
|
||||
// Infer orderings after basic maps to particular types
|
||||
val basicMapExpectations = ImplicitOrderingSuite.basicMapExpectations(rdd)
|
||||
basicMapExpectations.map({case (met, explain) => assert(met, explain)})
|
||||
|
||||
|
||||
// Infer orderings for other RDD methods
|
||||
val otherRDDMethodExpectations = ImplicitOrderingSuite.otherRDDMethodExpectations(rdd)
|
||||
otherRDDMethodExpectations.map({case (met, explain) => assert(met, explain)})
|
||||
|
@ -50,30 +50,30 @@ private object ImplicitOrderingSuite {
|
|||
class OrderedClass extends Ordered[OrderedClass] {
|
||||
override def compare(o: OrderedClass): Int = throw new UnsupportedOperationException
|
||||
}
|
||||
|
||||
|
||||
def basicMapExpectations(rdd: RDD[Int]): List[(Boolean, String)] = {
|
||||
List((rdd.map(x => (x, x)).keyOrdering.isDefined,
|
||||
List((rdd.map(x => (x, x)).keyOrdering.isDefined,
|
||||
"rdd.map(x => (x, x)).keyOrdering.isDefined"),
|
||||
(rdd.map(x => (1, x)).keyOrdering.isDefined,
|
||||
(rdd.map(x => (1, x)).keyOrdering.isDefined,
|
||||
"rdd.map(x => (1, x)).keyOrdering.isDefined"),
|
||||
(rdd.map(x => (x.toString, x)).keyOrdering.isDefined,
|
||||
(rdd.map(x => (x.toString, x)).keyOrdering.isDefined,
|
||||
"rdd.map(x => (x.toString, x)).keyOrdering.isDefined"),
|
||||
(rdd.map(x => (null, x)).keyOrdering.isDefined,
|
||||
(rdd.map(x => (null, x)).keyOrdering.isDefined,
|
||||
"rdd.map(x => (null, x)).keyOrdering.isDefined"),
|
||||
(rdd.map(x => (new NonOrderedClass, x)).keyOrdering.isEmpty,
|
||||
(rdd.map(x => (new NonOrderedClass, x)).keyOrdering.isEmpty,
|
||||
"rdd.map(x => (new NonOrderedClass, x)).keyOrdering.isEmpty"),
|
||||
(rdd.map(x => (new ComparableClass, x)).keyOrdering.isDefined,
|
||||
(rdd.map(x => (new ComparableClass, x)).keyOrdering.isDefined,
|
||||
"rdd.map(x => (new ComparableClass, x)).keyOrdering.isDefined"),
|
||||
(rdd.map(x => (new OrderedClass, x)).keyOrdering.isDefined,
|
||||
(rdd.map(x => (new OrderedClass, x)).keyOrdering.isDefined,
|
||||
"rdd.map(x => (new OrderedClass, x)).keyOrdering.isDefined"))
|
||||
}
|
||||
|
||||
|
||||
def otherRDDMethodExpectations(rdd: RDD[Int]): List[(Boolean, String)] = {
|
||||
List((rdd.groupBy(x => x).keyOrdering.isDefined,
|
||||
List((rdd.groupBy(x => x).keyOrdering.isDefined,
|
||||
"rdd.groupBy(x => x).keyOrdering.isDefined"),
|
||||
(rdd.groupBy(x => new NonOrderedClass).keyOrdering.isEmpty,
|
||||
(rdd.groupBy(x => new NonOrderedClass).keyOrdering.isEmpty,
|
||||
"rdd.groupBy(x => new NonOrderedClass).keyOrdering.isEmpty"),
|
||||
(rdd.groupBy(x => new ComparableClass).keyOrdering.isDefined,
|
||||
(rdd.groupBy(x => new ComparableClass).keyOrdering.isDefined,
|
||||
"rdd.groupBy(x => new ComparableClass).keyOrdering.isDefined"),
|
||||
(rdd.groupBy(x => new OrderedClass).keyOrdering.isDefined,
|
||||
"rdd.groupBy(x => new OrderedClass).keyOrdering.isDefined"),
|
||||
|
|
|
@ -73,22 +73,22 @@ class SparkContextSuite extends FunSuite with LocalSparkContext {
|
|||
var sc2: SparkContext = null
|
||||
SparkContext.clearActiveContext()
|
||||
val conf = new SparkConf().setAppName("test").setMaster("local")
|
||||
|
||||
|
||||
sc = SparkContext.getOrCreate(conf)
|
||||
|
||||
|
||||
assert(sc.getConf.get("spark.app.name").equals("test"))
|
||||
sc2 = SparkContext.getOrCreate(new SparkConf().setAppName("test2").setMaster("local"))
|
||||
assert(sc2.getConf.get("spark.app.name").equals("test"))
|
||||
assert(sc === sc2)
|
||||
assert(sc eq sc2)
|
||||
|
||||
|
||||
// Try creating second context to confirm that it's still possible, if desired
|
||||
sc2 = new SparkContext(new SparkConf().setAppName("test3").setMaster("local")
|
||||
.set("spark.driver.allowMultipleContexts", "true"))
|
||||
|
||||
|
||||
sc2.stop()
|
||||
}
|
||||
|
||||
|
||||
test("BytesWritable implicit conversion is correct") {
|
||||
// Regression test for SPARK-3121
|
||||
val bytesWritable = new BytesWritable()
|
||||
|
|
|
@ -82,7 +82,7 @@ class JdbcRDDSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
|
|||
assert(rdd.count === 100)
|
||||
assert(rdd.reduce(_ + _) === 10100)
|
||||
}
|
||||
|
||||
|
||||
test("large id overflow") {
|
||||
sc = new SparkContext("local", "test")
|
||||
val rdd = new JdbcRDD(
|
||||
|
|
|
@ -29,11 +29,11 @@ class MemoryUtilsSuite extends FunSuite with MockitoSugar {
|
|||
|
||||
val sc = mock[SparkContext]
|
||||
when(sc.conf).thenReturn(sparkConf)
|
||||
|
||||
|
||||
// 384 > sc.executorMemory * 0.1 => 512 + 384 = 896
|
||||
when(sc.executorMemory).thenReturn(512)
|
||||
assert(MemoryUtils.calculateTotalMemory(sc) === 896)
|
||||
|
||||
|
||||
// 384 < sc.executorMemory * 0.1 => 4096 + (4096 * 0.1) = 4505.6
|
||||
when(sc.executorMemory).thenReturn(4096)
|
||||
assert(MemoryUtils.calculateTotalMemory(sc) === 4505)
|
||||
|
|
|
@ -80,11 +80,11 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Mo
|
|||
.set("spark.mesos.executor.docker.image", "spark/mock")
|
||||
.set("spark.mesos.executor.docker.volumes", "/a,/b:/b,/c:/c:rw,/d:ro,/e:/e:ro")
|
||||
.set("spark.mesos.executor.docker.portmaps", "80:8080,53:53:tcp")
|
||||
|
||||
|
||||
val listenerBus = mock[LiveListenerBus]
|
||||
listenerBus.post(
|
||||
SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
|
||||
|
||||
|
||||
val sc = mock[SparkContext]
|
||||
when(sc.executorMemory).thenReturn(100)
|
||||
when(sc.getSparkHome()).thenReturn(Option("/spark-home"))
|
||||
|
|
|
@ -63,7 +63,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
|
|||
assert(thrown3.getMessage.contains(kryoBufferProperty))
|
||||
assert(!thrown3.getMessage.contains(kryoBufferMaxProperty))
|
||||
}
|
||||
|
||||
|
||||
test("basic types") {
|
||||
val ser = new KryoSerializer(conf).newInstance()
|
||||
def check[T: ClassTag](t: T) {
|
||||
|
|
|
@ -25,7 +25,7 @@ import org.apache.spark.rdd.RDD
|
|||
/* A trivial (but unserializable) container for trivial functions */
|
||||
class UnserializableClass {
|
||||
def op[T](x: T): String = x.toString
|
||||
|
||||
|
||||
def pred[T](x: T): Boolean = x.toString.length % 2 == 0
|
||||
}
|
||||
|
||||
|
@ -47,7 +47,7 @@ class ProactiveClosureSerializationSuite extends FunSuite with SharedSparkContex
|
|||
// iterating over a map from transformation names to functions that perform that
|
||||
// transformation on a given RDD, creating one test case for each
|
||||
|
||||
for (transformation <-
|
||||
for (transformation <-
|
||||
Map("map" -> xmap _,
|
||||
"flatMap" -> xflatMap _,
|
||||
"filter" -> xfilter _,
|
||||
|
@ -60,24 +60,24 @@ class ProactiveClosureSerializationSuite extends FunSuite with SharedSparkContex
|
|||
val ex = intercept[SparkException] {
|
||||
xf(data, uc)
|
||||
}
|
||||
assert(ex.getMessage.contains("Task not serializable"),
|
||||
assert(ex.getMessage.contains("Task not serializable"),
|
||||
s"RDD.$name doesn't proactively throw NotSerializableException")
|
||||
}
|
||||
}
|
||||
|
||||
private def xmap(x: RDD[String], uc: UnserializableClass): RDD[String] =
|
||||
private def xmap(x: RDD[String], uc: UnserializableClass): RDD[String] =
|
||||
x.map(y => uc.op(y))
|
||||
|
||||
private def xflatMap(x: RDD[String], uc: UnserializableClass): RDD[String] =
|
||||
private def xflatMap(x: RDD[String], uc: UnserializableClass): RDD[String] =
|
||||
x.flatMap(y => Seq(uc.op(y)))
|
||||
|
||||
private def xfilter(x: RDD[String], uc: UnserializableClass): RDD[String] =
|
||||
private def xfilter(x: RDD[String], uc: UnserializableClass): RDD[String] =
|
||||
x.filter(y => uc.pred(y))
|
||||
|
||||
private def xmapPartitions(x: RDD[String], uc: UnserializableClass): RDD[String] =
|
||||
private def xmapPartitions(x: RDD[String], uc: UnserializableClass): RDD[String] =
|
||||
x.mapPartitions(_.map(y => uc.op(y)))
|
||||
|
||||
private def xmapPartitionsWithIndex(x: RDD[String], uc: UnserializableClass): RDD[String] =
|
||||
private def xmapPartitionsWithIndex(x: RDD[String], uc: UnserializableClass): RDD[String] =
|
||||
x.mapPartitionsWithIndex((_, it) => it.map(y => uc.op(y)))
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -203,7 +203,7 @@ object TestObjectWithNestedReturns {
|
|||
def run(): Int = {
|
||||
withSpark(new SparkContext("local", "test")) { sc =>
|
||||
val nums = sc.parallelize(Array(1, 2, 3, 4))
|
||||
nums.map {x =>
|
||||
nums.map {x =>
|
||||
// this return is fine since it will not transfer control outside the closure
|
||||
def foo(): Int = { return 5; 1 }
|
||||
foo()
|
||||
|
|
|
@ -76,7 +76,7 @@ class RandomSamplerSuite extends FunSuite with Matchers {
|
|||
}
|
||||
|
||||
// Returns iterator over gap lengths between samples.
|
||||
// This function assumes input data is integers sampled from the sequence of
|
||||
// This function assumes input data is integers sampled from the sequence of
|
||||
// increasing integers: {0, 1, 2, ...}. This works because that is how I generate them,
|
||||
// and the samplers preserve their input order
|
||||
def gaps(data: Iterator[Int]): Iterator[Int] = {
|
||||
|
|
Loading…
Reference in a new issue