[SPARK-26463][CORE] Use ConfigEntry for hardcoded configs for scheduler categories.

## What changes were proposed in this pull request?

The PR makes hardcoded `spark.dynamicAllocation`, `spark.scheduler`, `spark.rpc`, `spark.task`, `spark.speculation`, and `spark.cleaner` configs to use `ConfigEntry`.

## How was this patch tested?

Existing tests

Closes #23416 from kiszk/SPARK-26463.

Authored-by: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
This commit is contained in:
Kazuaki Ishizaki 2019-01-22 07:44:36 -06:00 committed by Sean Owen
parent d2e86cb3cd
commit 7bf0794651
68 changed files with 521 additions and 280 deletions

View file

@ -25,6 +25,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.rdd.{RDD, ReliableRDDCheckpointData}
import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, ThreadUtils, Utils}
@ -83,8 +84,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
* on the driver, this may happen very occasionally or not at all. Not cleaning at all may
* lead to executors running out of disk space after a while.
*/
private val periodicGCInterval =
sc.conf.getTimeAsSeconds("spark.cleaner.periodicGC.interval", "30min")
private val periodicGCInterval = sc.conf.get(CLEANER_PERIODIC_GC_INTERVAL)
/**
* Whether the cleaning thread will block on cleanup tasks (other than shuffle, which
@ -96,8 +96,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
* for instance, when the driver performs a GC and cleans up all broadcast blocks that are no
* longer in scope.
*/
private val blockOnCleanupTasks = sc.conf.getBoolean(
"spark.cleaner.referenceTracking.blocking", true)
private val blockOnCleanupTasks = sc.conf.get(CLEANER_REFERENCE_TRACKING_BLOCKING)
/**
* Whether the cleaning thread will block on shuffle cleanup tasks.
@ -109,8 +108,8 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
* until the real RPC issue (referred to in the comment above `blockOnCleanupTasks`) is
* resolved.
*/
private val blockOnShuffleCleanupTasks = sc.conf.getBoolean(
"spark.cleaner.referenceTracking.blocking.shuffle", false)
private val blockOnShuffleCleanupTasks =
sc.conf.get(CLEANER_REFERENCE_TRACKING_BLOCKING_SHUFFLE)
@volatile private var stopped = false

View file

@ -107,28 +107,25 @@ private[spark] class ExecutorAllocationManager(
private val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf)
// How long there must be backlogged tasks for before an addition is triggered (seconds)
private val schedulerBacklogTimeoutS = conf.getTimeAsSeconds(
"spark.dynamicAllocation.schedulerBacklogTimeout", "1s")
private val schedulerBacklogTimeoutS = conf.get(DYN_ALLOCATION_SCHEDULER_BACKLOG_TIMEOUT)
// Same as above, but used only after `schedulerBacklogTimeoutS` is exceeded
private val sustainedSchedulerBacklogTimeoutS = conf.getTimeAsSeconds(
"spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", s"${schedulerBacklogTimeoutS}s")
private val sustainedSchedulerBacklogTimeoutS =
conf.get(DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT)
// How long an executor must be idle for before it is removed (seconds)
private val executorIdleTimeoutS = conf.getTimeAsSeconds(
"spark.dynamicAllocation.executorIdleTimeout", "60s")
private val executorIdleTimeoutS = conf.get(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT)
private val cachedExecutorIdleTimeoutS = conf.getTimeAsSeconds(
"spark.dynamicAllocation.cachedExecutorIdleTimeout", s"${Integer.MAX_VALUE}s")
private val cachedExecutorIdleTimeoutS = conf.get(DYN_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT)
// During testing, the methods to actually kill and add executors are mocked out
private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false)
private val testing = conf.get(DYN_ALLOCATION_TESTING)
// TODO: The default value of 1 for spark.executor.cores works right now because dynamic
// allocation is only supported for YARN and the default number of cores per executor in YARN is
// 1, but it might need to be attained differently for different cluster managers
private val tasksPerExecutorForFullParallelism =
conf.get(EXECUTOR_CORES) / conf.getInt("spark.task.cpus", 1)
conf.get(EXECUTOR_CORES) / conf.get(CPUS_PER_TASK)
private val executorAllocationRatio =
conf.get(DYN_ALLOCATION_EXECUTOR_ALLOCATION_RATIO)
@ -195,27 +192,29 @@ private[spark] class ExecutorAllocationManager(
*/
private def validateSettings(): Unit = {
if (minNumExecutors < 0 || maxNumExecutors < 0) {
throw new SparkException("spark.dynamicAllocation.{min/max}Executors must be positive!")
throw new SparkException(
s"${DYN_ALLOCATION_MIN_EXECUTORS.key} and ${DYN_ALLOCATION_MAX_EXECUTORS.key} must be " +
"positive!")
}
if (maxNumExecutors == 0) {
throw new SparkException("spark.dynamicAllocation.maxExecutors cannot be 0!")
throw new SparkException(s"${DYN_ALLOCATION_MAX_EXECUTORS.key} cannot be 0!")
}
if (minNumExecutors > maxNumExecutors) {
throw new SparkException(s"spark.dynamicAllocation.minExecutors ($minNumExecutors) must " +
s"be less than or equal to spark.dynamicAllocation.maxExecutors ($maxNumExecutors)!")
throw new SparkException(s"${DYN_ALLOCATION_MIN_EXECUTORS.key} ($minNumExecutors) must " +
s"be less than or equal to ${DYN_ALLOCATION_MAX_EXECUTORS.key} ($maxNumExecutors)!")
}
if (schedulerBacklogTimeoutS <= 0) {
throw new SparkException("spark.dynamicAllocation.schedulerBacklogTimeout must be > 0!")
throw new SparkException(s"${DYN_ALLOCATION_SCHEDULER_BACKLOG_TIMEOUT.key} must be > 0!")
}
if (sustainedSchedulerBacklogTimeoutS <= 0) {
throw new SparkException(
"spark.dynamicAllocation.sustainedSchedulerBacklogTimeout must be > 0!")
s"s${DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT.key} must be > 0!")
}
if (executorIdleTimeoutS < 0) {
throw new SparkException("spark.dynamicAllocation.executorIdleTimeout must be >= 0!")
throw new SparkException(s"${DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT.key} must be >= 0!")
}
if (cachedExecutorIdleTimeoutS < 0) {
throw new SparkException("spark.dynamicAllocation.cachedExecutorIdleTimeout must be >= 0!")
throw new SparkException(s"${DYN_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT.key} must be >= 0!")
}
// Require external shuffle service for dynamic allocation
// Otherwise, we may lose shuffle files when killing executors
@ -224,12 +223,12 @@ private[spark] class ExecutorAllocationManager(
"shuffle service. You may enable this through spark.shuffle.service.enabled.")
}
if (tasksPerExecutorForFullParallelism == 0) {
throw new SparkException(s"${EXECUTOR_CORES.key} must not be < spark.task.cpus.")
throw new SparkException(s"${EXECUTOR_CORES.key} must not be < ${CPUS_PER_TASK.key}.")
}
if (executorAllocationRatio > 1.0 || executorAllocationRatio <= 0.0) {
throw new SparkException(
"spark.dynamicAllocation.executorAllocationRatio must be > 0 and <= 1.0")
s"${DYN_ALLOCATION_EXECUTOR_ALLOCATION_RATIO.key} must be > 0 and <= 1.0")
}
}

View file

@ -24,6 +24,7 @@ import scala.concurrent.Future
import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.config.Network
import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
import org.apache.spark.scheduler._
import org.apache.spark.storage.BlockManagerId
@ -74,18 +75,9 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
// executor ID -> timestamp of when the last heartbeat from this executor was received
private val executorLastSeen = new mutable.HashMap[String, Long]
// "spark.network.timeout" uses "seconds", while `spark.storage.blockManagerSlaveTimeoutMs` uses
// "milliseconds"
private val executorTimeoutMs =
sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs",
s"${sc.conf.getTimeAsSeconds("spark.network.timeout", "120s")}s")
private val executorTimeoutMs = sc.conf.get(config.STORAGE_BLOCKMANAGER_SLAVE_TIMEOUT)
// "spark.network.timeoutInterval" uses "seconds", while
// "spark.storage.blockManagerTimeoutIntervalMs" uses "milliseconds"
private val timeoutIntervalMs =
sc.conf.get(config.STORAGE_BLOCKMANAGER_TIMEOUTINTERVAL)
private val checkTimeoutIntervalMs =
sc.conf.getTimeAsSeconds("spark.network.timeoutInterval", s"${timeoutIntervalMs}ms") * 1000
private val checkTimeoutIntervalMs = sc.conf.get(Network.NETWORK_TIMEOUT_INTERVAL)
private var timeoutCheckingTask: ScheduledFuture[_] = null

View file

@ -288,7 +288,7 @@ private[spark] class SecurityManager(
* @return Whether to enable encryption when connecting to services that support it.
*/
def isEncryptionEnabled(): Boolean = {
sparkConf.get(NETWORK_ENCRYPTION_ENABLED) || sparkConf.get(SASL_ENCRYPTION_ENABLED)
sparkConf.get(Network.NETWORK_CRYPTO_ENABLED) || sparkConf.get(SASL_ENCRYPTION_ENABLED)
}
/**

View file

@ -29,6 +29,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.History._
import org.apache.spark.internal.config.Kryo._
import org.apache.spark.internal.config.Network._
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.util.Utils
@ -576,26 +577,27 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
}
}
if (contains(EXECUTOR_CORES) && contains("spark.task.cpus")) {
if (contains(EXECUTOR_CORES) && contains(CPUS_PER_TASK)) {
val executorCores = get(EXECUTOR_CORES)
val taskCpus = getInt("spark.task.cpus", 1)
val taskCpus = get(CPUS_PER_TASK)
if (executorCores < taskCpus) {
throw new SparkException(s"${EXECUTOR_CORES.key} must not be less than spark.task.cpus.")
throw new SparkException(
s"${EXECUTOR_CORES.key} must not be less than ${CPUS_PER_TASK.key}.")
}
}
val encryptionEnabled = get(NETWORK_ENCRYPTION_ENABLED) || get(SASL_ENCRYPTION_ENABLED)
val encryptionEnabled = get(NETWORK_CRYPTO_ENABLED) || get(SASL_ENCRYPTION_ENABLED)
require(!encryptionEnabled || get(NETWORK_AUTH_ENABLED),
s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.")
val executorTimeoutThresholdMs =
getTimeAsSeconds("spark.network.timeout", "120s") * 1000
val executorTimeoutThresholdMs = get(NETWORK_TIMEOUT) * 1000
val executorHeartbeatIntervalMs = get(EXECUTOR_HEARTBEAT_INTERVAL)
val networkTimeout = NETWORK_TIMEOUT.key
// If spark.executor.heartbeatInterval bigger than spark.network.timeout,
// it will almost always cause ExecutorLostFailure. See SPARK-22754.
require(executorTimeoutThresholdMs > executorHeartbeatIntervalMs, "The value of " +
s"spark.network.timeout=${executorTimeoutThresholdMs}ms must be greater than the value of " +
s"${networkTimeout}=${executorTimeoutThresholdMs}ms must be no less than the value of " +
s"spark.executor.heartbeatInterval=${executorHeartbeatIntervalMs}ms.")
}
@ -680,13 +682,13 @@ private[spark] object SparkConf extends Logging {
AlternateConfig("spark.io.compression.snappy.block.size", "1.4")),
IO_COMPRESSION_LZ4_BLOCKSIZE.key -> Seq(
AlternateConfig("spark.io.compression.lz4.block.size", "1.4")),
"spark.rpc.numRetries" -> Seq(
RPC_NUM_RETRIES.key -> Seq(
AlternateConfig("spark.akka.num.retries", "1.4")),
"spark.rpc.retry.wait" -> Seq(
RPC_RETRY_WAIT.key -> Seq(
AlternateConfig("spark.akka.retry.wait", "1.4")),
"spark.rpc.askTimeout" -> Seq(
RPC_ASK_TIMEOUT.key -> Seq(
AlternateConfig("spark.akka.askTimeout", "1.4")),
"spark.rpc.lookupTimeout" -> Seq(
RPC_LOOKUP_TIMEOUT.key -> Seq(
AlternateConfig("spark.akka.lookupTimeout", "1.4")),
"spark.streaming.fileStream.minRememberDuration" -> Seq(
AlternateConfig("spark.streaming.minRememberDuration", "1.5")),
@ -694,7 +696,7 @@ private[spark] object SparkConf extends Logging {
AlternateConfig("spark.yarn.max.worker.failures", "1.5")),
MEMORY_OFFHEAP_ENABLED.key -> Seq(
AlternateConfig("spark.unsafe.offHeap", "1.6")),
"spark.rpc.message.maxSize" -> Seq(
RPC_MESSAGE_MAX_SIZE.key -> Seq(
AlternateConfig("spark.akka.frameSize", "1.6")),
"spark.yarn.jars" -> Seq(
AlternateConfig("spark.yarn.jar", "2.0")),

View file

@ -553,7 +553,7 @@ class SparkContext(config: SparkConf) extends Logging {
_executorAllocationManager.foreach(_.start())
_cleaner =
if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
if (_conf.get(CLEANER_REFERENCE_TRACKING)) {
Some(new ContextCleaner(this))
} else {
None
@ -2538,6 +2538,7 @@ object SparkContext extends Logging {
private[spark] val SPARK_JOB_DESCRIPTION = "spark.job.description"
private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id"
private[spark] val SPARK_JOB_INTERRUPT_ON_CANCEL = "spark.job.interruptOnCancel"
private[spark] val SPARK_SCHEDULER_POOL = "spark.scheduler.pool"
private[spark] val RDD_SCOPE_KEY = "spark.rdd.scope"
private[spark] val RDD_SCOPE_NO_OVERRIDE_KEY = "spark.rdd.scope.noOverride"

View file

@ -417,8 +417,8 @@ object SparkEnv extends Logging {
// Spark properties
// This includes the scheduling mode whether or not it is configured (used by SparkUI)
val schedulerMode =
if (!conf.contains("spark.scheduler.mode")) {
Seq(("spark.scheduler.mode", schedulingMode))
if (!conf.contains(SCHEDULER_MODE)) {
Seq((SCHEDULER_MODE.key, schedulingMode))
} else {
Seq.empty[(String, String)]
}

View file

@ -28,6 +28,7 @@ import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.config.Network.RPC_ASK_TIMEOUT
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint}
import org.apache.spark.util.{SparkExitCode, ThreadUtils, Utils}
@ -226,8 +227,8 @@ private[spark] class ClientApp extends SparkApplication {
override def start(args: Array[String], conf: SparkConf): Unit = {
val driverArgs = new ClientArguments(args)
if (!conf.contains("spark.rpc.askTimeout")) {
conf.set("spark.rpc.askTimeout", "10s")
if (!conf.contains(RPC_ASK_TIMEOUT)) {
conf.set(RPC_ASK_TIMEOUT, "10s")
}
Logger.getRootLogger.setLevel(driverArgs.logLevel)

View file

@ -32,6 +32,7 @@ import scala.util.Try
import org.apache.spark.{SparkException, SparkUserAppException}
import org.apache.spark.deploy.SparkSubmitAction._
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.config.DYN_ALLOCATION_ENABLED
import org.apache.spark.launcher.SparkSubmitArgumentsParser
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.util.Utils
@ -208,7 +209,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
.orElse(sparkProperties.get("spark.yarn.principal"))
.orNull
dynamicAllocationEnabled =
sparkProperties.get("spark.dynamicAllocation.enabled").exists("true".equalsIgnoreCase)
sparkProperties.get(DYN_ALLOCATION_ENABLED.key).exists("true".equalsIgnoreCase)
// Try to set main class from JAR if no --class argument is given
if (mainClass == null && !isPython && !isR && primaryResource != null) {

View file

@ -124,7 +124,7 @@ private[spark] class Executor(
private val userClassPathFirst = conf.get(EXECUTOR_USER_CLASS_PATH_FIRST)
// Whether to monitor killed / interrupted tasks
private val taskReaperEnabled = conf.getBoolean("spark.task.reaper.enabled", false)
private val taskReaperEnabled = conf.get(TASK_REAPER_ENABLED)
// Create our ClassLoader
// do this after SparkEnv creation so can access the SecurityManager
@ -163,7 +163,7 @@ private[spark] class Executor(
// Max size of direct result. If task result is bigger than this, we use the block manager
// to send the result back.
private val maxDirectResultSize = Math.min(
conf.getSizeAsBytes("spark.task.maxDirectResultSize", 1L << 20),
conf.get(TASK_MAX_DIRECT_RESULT_SIZE),
RpcUtils.maxMessageSizeBytes(conf))
private val maxResultSize = conf.get(MAX_RESULT_SIZE)
@ -667,13 +667,11 @@ private[spark] class Executor(
private[this] val taskId: Long = taskRunner.taskId
private[this] val killPollingIntervalMs: Long =
conf.getTimeAsMs("spark.task.reaper.pollingInterval", "10s")
private[this] val killPollingIntervalMs: Long = conf.get(TASK_REAPER_POLLING_INTERVAL)
private[this] val killTimeoutMs: Long = conf.getTimeAsMs("spark.task.reaper.killTimeout", "-1")
private[this] val killTimeoutMs: Long = conf.get(TASK_REAPER_KILL_TIMEOUT)
private[this] val takeThreadDump: Boolean =
conf.getBoolean("spark.task.reaper.threadDump", true)
private[this] val takeThreadDump: Boolean = conf.get(TASK_REAPER_THREAD_DUMP)
override def run(): Unit = {
val startTimeMs = System.currentTimeMillis()

View file

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.internal.config
import java.util.concurrent.TimeUnit
private[spark] object Network {
private[spark] val NETWORK_CRYPTO_SASL_FALLBACK =
ConfigBuilder("spark.network.crypto.saslFallback")
.booleanConf
.createWithDefault(true)
private[spark] val NETWORK_CRYPTO_ENABLED =
ConfigBuilder("spark.network.crypto.enabled")
.booleanConf
.createWithDefault(false)
private[spark] val NETWORK_REMOTE_READ_NIO_BUFFER_CONVERSION =
ConfigBuilder("spark.network.remoteReadNioBufferConversion")
.booleanConf
.createWithDefault(false)
private[spark] val NETWORK_TIMEOUT =
ConfigBuilder("spark.network.timeout")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("120s")
private[spark] val NETWORK_TIMEOUT_INTERVAL =
ConfigBuilder("spark.network.timeoutInterval")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString(STORAGE_BLOCKMANAGER_TIMEOUTINTERVAL.defaultValueString)
private[spark] val RPC_ASK_TIMEOUT =
ConfigBuilder("spark.rpc.askTimeout")
.stringConf
.createOptional
private[spark] val RPC_CONNECT_THREADS =
ConfigBuilder("spark.rpc.connect.threads")
.intConf
.createWithDefault(64)
private[spark] val RPC_IO_NUM_CONNECTIONS_PER_PEER =
ConfigBuilder("spark.rpc.io.numConnectionsPerPeer")
.intConf
.createWithDefault(1)
private[spark] val RPC_IO_THREADS =
ConfigBuilder("spark.rpc.io.threads")
.intConf
.createOptional
private[spark] val RPC_LOOKUP_TIMEOUT =
ConfigBuilder("spark.rpc.lookupTimeout")
.stringConf
.createOptional
private[spark] val RPC_MESSAGE_MAX_SIZE =
ConfigBuilder("spark.rpc.message.maxSize")
.intConf
.createWithDefault(128)
private[spark] val RPC_NETTY_DISPATCHER_NUM_THREADS =
ConfigBuilder("spark.rpc.netty.dispatcher.numThreads")
.intConf
.createOptional
private[spark] val RPC_NUM_RETRIES =
ConfigBuilder("spark.rpc.numRetries")
.intConf
.createWithDefault(3)
private[spark] val RPC_RETRY_WAIT =
ConfigBuilder("spark.rpc.retry.wait")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("3s")
}

View file

@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.scheduler.EventLoggingListener
import org.apache.spark.scheduler.{EventLoggingListener, SchedulingMode}
import org.apache.spark.storage.{DefaultTopologyMapper, RandomBlockReplicationPolicy}
import org.apache.spark.unsafe.array.ByteArrayMethods
import org.apache.spark.util.Utils
@ -265,11 +265,22 @@ package object config {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("60s")
private[spark] val STORAGE_BLOCKMANAGER_SLAVE_TIMEOUT =
ConfigBuilder("spark.storage.blockManagerSlaveTimeoutMs")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString(Network.NETWORK_TIMEOUT.defaultValueString)
private[spark] val IS_PYTHON_APP = ConfigBuilder("spark.yarn.isPython").internal()
.booleanConf.createWithDefault(false)
private[spark] val CPUS_PER_TASK = ConfigBuilder("spark.task.cpus").intConf.createWithDefault(1)
private[spark] val DYN_ALLOCATION_ENABLED =
ConfigBuilder("spark.dynamicAllocation.enabled").booleanConf.createWithDefault(false)
private[spark] val DYN_ALLOCATION_TESTING =
ConfigBuilder("spark.dynamicAllocation.testing").booleanConf.createWithDefault(false)
private[spark] val DYN_ALLOCATION_MIN_EXECUTORS =
ConfigBuilder("spark.dynamicAllocation.minExecutors").intConf.createWithDefault(0)
@ -284,6 +295,22 @@ package object config {
ConfigBuilder("spark.dynamicAllocation.executorAllocationRatio")
.doubleConf.createWithDefault(1.0)
private[spark] val DYN_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT =
ConfigBuilder("spark.dynamicAllocation.cachedExecutorIdleTimeout")
.timeConf(TimeUnit.SECONDS).createWithDefault(Integer.MAX_VALUE)
private[spark] val DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT =
ConfigBuilder("spark.dynamicAllocation.executorIdleTimeout")
.timeConf(TimeUnit.SECONDS).createWithDefault(60)
private[spark] val DYN_ALLOCATION_SCHEDULER_BACKLOG_TIMEOUT =
ConfigBuilder("spark.dynamicAllocation.schedulerBacklogTimeout")
.timeConf(TimeUnit.SECONDS).createWithDefault(1)
private[spark] val DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT =
ConfigBuilder("spark.dynamicAllocation.sustainedSchedulerBacklogTimeout")
.fallbackConf(DYN_ALLOCATION_SCHEDULER_BACKLOG_TIMEOUT)
private[spark] val LOCALITY_WAIT = ConfigBuilder("spark.locality.wait")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("3s")
@ -316,11 +343,36 @@ package object config {
.toSequence
.createWithDefault(Nil)
private[spark] val MAX_TASK_FAILURES =
private[spark] val TASK_MAX_DIRECT_RESULT_SIZE =
ConfigBuilder("spark.task.maxDirectResultSize")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(1L << 20)
private[spark] val TASK_MAX_FAILURES =
ConfigBuilder("spark.task.maxFailures")
.intConf
.createWithDefault(4)
private[spark] val TASK_REAPER_ENABLED =
ConfigBuilder("spark.task.reaper.enabled")
.booleanConf
.createWithDefault(false)
private[spark] val TASK_REAPER_KILL_TIMEOUT =
ConfigBuilder("spark.task.reaper.killTimeout")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(-1)
private[spark] val TASK_REAPER_POLLING_INTERVAL =
ConfigBuilder("spark.task.reaper.pollingInterval")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("10s")
private[spark] val TASK_REAPER_THREAD_DUMP =
ConfigBuilder("spark.task.reaper.threadDump")
.booleanConf
.createWithDefault(true)
// Blacklist confs
private[spark] val BLACKLIST_ENABLED =
ConfigBuilder("spark.blacklist.enabled")
@ -574,11 +626,6 @@ package object config {
"secret keys are only allowed when using Kubernetes.")
.fallbackConf(AUTH_SECRET_FILE)
private[spark] val NETWORK_ENCRYPTION_ENABLED =
ConfigBuilder("spark.network.crypto.enabled")
.booleanConf
.createWithDefault(false)
private[spark] val BUFFER_WRITE_CHUNK_SIZE =
ConfigBuilder("spark.buffer.write.chunkSize")
.internal()
@ -930,6 +977,31 @@ package object config {
.toSequence
.createWithDefault(Nil)
private[spark] val CLEANER_PERIODIC_GC_INTERVAL =
ConfigBuilder("spark.cleaner.periodicGC.interval")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("30min")
private[spark] val CLEANER_REFERENCE_TRACKING =
ConfigBuilder("spark.cleaner.referenceTracking")
.booleanConf
.createWithDefault(true)
private[spark] val CLEANER_REFERENCE_TRACKING_BLOCKING =
ConfigBuilder("spark.cleaner.referenceTracking.blocking")
.booleanConf
.createWithDefault(true)
private[spark] val CLEANER_REFERENCE_TRACKING_BLOCKING_SHUFFLE =
ConfigBuilder("spark.cleaner.referenceTracking.blocking.shuffle")
.booleanConf
.createWithDefault(false)
private[spark] val CLEANER_REFERENCE_TRACKING_CLEAN_CHECKPOINTS =
ConfigBuilder("spark.cleaner.referenceTracking.cleanCheckpoints")
.booleanConf
.createWithDefault(false)
private[spark] val EXECUTOR_LOGS_ROLLING_STRATEGY =
ConfigBuilder("spark.executor.logs.rolling.strategy").stringConf.createWithDefault("")
@ -1103,4 +1175,49 @@ package object config {
.stringConf
.toSequence
.createWithDefault(Nil)
private[spark] val SCHEDULER_ALLOCATION_FILE =
ConfigBuilder("spark.scheduler.allocation.file")
.stringConf
.createOptional
private[spark] val SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO =
ConfigBuilder("spark.scheduler.minRegisteredResourcesRatio")
.doubleConf
.createOptional
private[spark] val SCHEDULER_MAX_REGISTERED_RESOURCE_WAITING_TIME =
ConfigBuilder("spark.scheduler.maxRegisteredResourcesWaitingTime")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("30s")
private[spark] val SCHEDULER_MODE =
ConfigBuilder("spark.scheduler.mode")
.stringConf
.createWithDefault(SchedulingMode.FIFO.toString)
private[spark] val SCHEDULER_REVIVE_INTERVAL =
ConfigBuilder("spark.scheduler.revive.interval")
.timeConf(TimeUnit.MILLISECONDS)
.createOptional
private[spark] val SPECULATION_ENABLED =
ConfigBuilder("spark.speculation")
.booleanConf
.createWithDefault(false)
private[spark] val SPECULATION_INTERVAL =
ConfigBuilder("spark.speculation.interval")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(100)
private[spark] val SPECULATION_MULTIPLIER =
ConfigBuilder("spark.speculation.multiplier")
.doubleConf
.createWithDefault(1.5)
private[spark] val SPECULATION_QUANTILE =
ConfigBuilder("spark.speculation.quantile")
.doubleConf
.createWithDefault(0.75)
}

View file

@ -36,6 +36,7 @@ import org.apache.spark._
import org.apache.spark.Partitioner.defaultPartitioner
import org.apache.spark.annotation.Experimental
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.SPECULATION_ENABLED
import org.apache.spark.internal.io._
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.serializer.Serializer
@ -1051,7 +1052,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
// When speculation is on and output committer class name contains "Direct", we should warn
// users that they may loss data if they are using a direct output committer.
val speculationEnabled = self.conf.getBoolean("spark.speculation", false)
val speculationEnabled = self.conf.get(SPECULATION_ENABLED)
val outputCommitterClass = hadoopConf.get("mapred.output.committer.class", "")
if (speculationEnabled && outputCommitterClass.contains("Direct")) {
val warningMessage =

View file

@ -36,6 +36,7 @@ import org.apache.spark.Partitioner._
import org.apache.spark.annotation.{DeveloperApi, Experimental, Since}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.RDD_LIMIT_SCALE_UP_FACTOR
import org.apache.spark.partial.BoundedDouble
import org.apache.spark.partial.CountEvaluator
@ -1591,8 +1592,8 @@ abstract class RDD[T: ClassTag](
* The checkpoint directory set through `SparkContext#setCheckpointDir` is not used.
*/
def localCheckpoint(): this.type = RDDCheckpointData.synchronized {
if (conf.getBoolean("spark.dynamicAllocation.enabled", false) &&
conf.contains("spark.dynamicAllocation.cachedExecutorIdleTimeout")) {
if (conf.get(DYN_ALLOCATION_ENABLED) &&
conf.contains(DYN_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT)) {
logWarning("Local checkpointing is NOT safe to use with dynamic allocation, " +
"which removes executors along with their cached blocks. If you must use both " +
"features, you are advised to set `spark.dynamicAllocation.cachedExecutorIdleTimeout` " +

View file

@ -23,6 +23,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.CLEANER_REFERENCE_TRACKING_CLEAN_CHECKPOINTS
/**
* An implementation of checkpointing that writes the RDD data to reliable storage.
@ -58,7 +59,7 @@ private[spark] class ReliableRDDCheckpointData[T: ClassTag](@transient private v
val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir)
// Optionally clean our checkpoint files if the reference is out of scope
if (rdd.conf.getBoolean("spark.cleaner.referenceTracking.cleanCheckpoints", false)) {
if (rdd.conf.get(CLEANER_REFERENCE_TRACKING_CLEAN_CHECKPOINTS)) {
rdd.context.cleaner.foreach { cleaner =>
cleaner.registerRDDCheckpointDataForCleanup(newRDD, rdd.id)
}

View file

@ -26,6 +26,7 @@ import scala.util.control.NonFatal
import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Network.RPC_NETTY_DISPATCHER_NUM_THREADS
import org.apache.spark.network.client.RpcResponseCallback
import org.apache.spark.rpc._
import org.apache.spark.util.ThreadUtils
@ -197,8 +198,8 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) exte
private val threadpool: ThreadPoolExecutor = {
val availableCores =
if (numUsableCores > 0) numUsableCores else Runtime.getRuntime.availableProcessors()
val numThreads = nettyEnv.conf.getInt("spark.rpc.netty.dispatcher.numThreads",
math.max(2, availableCores))
val numThreads = nettyEnv.conf.get(RPC_NETTY_DISPATCHER_NUM_THREADS)
.getOrElse(math.max(2, availableCores))
val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop")
for (i <- 0 until numThreads) {
pool.execute(new MessageLoop)

View file

@ -31,6 +31,7 @@ import scala.util.control.NonFatal
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Network._
import org.apache.spark.network.TransportContext
import org.apache.spark.network.client._
import org.apache.spark.network.crypto.{AuthClientBootstrap, AuthServerBootstrap}
@ -48,9 +49,9 @@ private[netty] class NettyRpcEnv(
numUsableCores: Int) extends RpcEnv(conf) with Logging {
private[netty] val transportConf = SparkTransportConf.fromSparkConf(
conf.clone.set("spark.rpc.io.numConnectionsPerPeer", "1"),
conf.clone.set(RPC_IO_NUM_CONNECTIONS_PER_PEER, 1),
"rpc",
conf.getInt("spark.rpc.io.threads", numUsableCores))
conf.get(RPC_IO_THREADS).getOrElse(numUsableCores))
private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)
@ -87,7 +88,7 @@ private[netty] class NettyRpcEnv(
// TODO: a non-blocking TransportClientFactory.createClient in future
private[netty] val clientConnectionExecutor = ThreadUtils.newDaemonCachedThreadPool(
"netty-rpc-connection",
conf.getInt("spark.rpc.connect.threads", 64))
conf.get(RPC_CONNECT_THREADS))
@volatile private var server: TransportServer = _

View file

@ -18,6 +18,7 @@
package org.apache.spark.scheduler
import org.apache.spark.SparkException
import org.apache.spark.internal.config.DYN_ALLOCATION_ENABLED
/**
* Exception thrown when submit a job with barrier stage(s) failing a required check.
@ -51,7 +52,7 @@ private[spark] object BarrierJobAllocationFailed {
val ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION =
"[SPARK-24942]: Barrier execution mode does not support dynamic resource allocation for " +
"now. You can disable dynamic resource allocation by setting Spark conf " +
"\"spark.dynamicAllocation.enabled\" to \"false\"."
s""""${DYN_ALLOCATION_ENABLED.key}" to "false"."""
// Error message when running a barrier stage that requires more slots than current total number.
val ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER =

View file

@ -460,15 +460,15 @@ private[spark] object BlacklistTracker extends Logging {
}
}
val maxTaskFailures = conf.get(config.MAX_TASK_FAILURES)
val maxTaskFailures = conf.get(config.TASK_MAX_FAILURES)
val maxNodeAttempts = conf.get(config.MAX_TASK_ATTEMPTS_PER_NODE)
if (maxNodeAttempts >= maxTaskFailures) {
throw new IllegalArgumentException(s"${config.MAX_TASK_ATTEMPTS_PER_NODE.key} " +
s"( = ${maxNodeAttempts}) was >= ${config.MAX_TASK_FAILURES.key} " +
s"( = ${maxNodeAttempts}) was >= ${config.TASK_MAX_FAILURES.key} " +
s"( = ${maxTaskFailures} ). Though blacklisting is enabled, with this configuration, " +
s"Spark will not be robust to one bad node. Decrease " +
s"${config.MAX_TASK_ATTEMPTS_PER_NODE.key}, increase ${config.MAX_TASK_FAILURES.key}, " +
s"${config.MAX_TASK_ATTEMPTS_PER_NODE.key}, increase ${config.TASK_MAX_FAILURES.key}, " +
s"or disable blacklisting with ${config.BLACKLIST_ENABLED.key}")
}
}

View file

@ -23,8 +23,9 @@ import java.util.{Locale, NoSuchElementException, Properties}
import scala.util.control.NonFatal
import scala.xml.{Node, XML}
import org.apache.spark.SparkConf
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.SCHEDULER_ALLOCATION_FILE
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.util.Utils
@ -56,10 +57,9 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
extends SchedulableBuilder with Logging {
val SCHEDULER_ALLOCATION_FILE_PROPERTY = "spark.scheduler.allocation.file"
val schedulerAllocFile = conf.getOption(SCHEDULER_ALLOCATION_FILE_PROPERTY)
val schedulerAllocFile = conf.get(SCHEDULER_ALLOCATION_FILE)
val DEFAULT_SCHEDULER_FILE = "fairscheduler.xml"
val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.pool"
val FAIR_SCHEDULER_PROPERTIES = SparkContext.SPARK_SCHEDULER_POOL
val DEFAULT_POOL_NAME = "default"
val MINIMUM_SHARES_PROPERTY = "minShare"
val SCHEDULING_MODE_PROPERTY = "schedulingMode"
@ -85,7 +85,7 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
} else {
logWarning("Fair Scheduler configuration file not found so jobs will be scheduled in " +
s"FIFO order. To use fair scheduling, configure pools in $DEFAULT_SCHEDULER_FILE or " +
s"set $SCHEDULER_ALLOCATION_FILE_PROPERTY to a file that contains the configuration.")
s"set ${SCHEDULER_ALLOCATION_FILE.key} to a file that contains the configuration.")
None
}
}

View file

@ -31,6 +31,7 @@ import org.apache.spark.TaskState.TaskState
import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config
import org.apache.spark.internal.config._
import org.apache.spark.rpc.RpcEndpoint
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.scheduler.TaskLocality.TaskLocality
@ -61,7 +62,7 @@ private[spark] class TaskSchedulerImpl(
import TaskSchedulerImpl._
def this(sc: SparkContext) = {
this(sc, sc.conf.get(config.MAX_TASK_FAILURES))
this(sc, sc.conf.get(config.TASK_MAX_FAILURES))
}
// Lazily initializing blacklistTrackerOpt to avoid getting empty ExecutorAllocationClient,
@ -71,7 +72,7 @@ private[spark] class TaskSchedulerImpl(
val conf = sc.conf
// How often to check for speculative tasks
val SPECULATION_INTERVAL_MS = conf.getTimeAsMs("spark.speculation.interval", "100ms")
val SPECULATION_INTERVAL_MS = conf.get(SPECULATION_INTERVAL)
// Duplicate copies of a task will only be launched if the original copy has been running for
// at least this amount of time. This is to avoid the overhead of launching speculative copies
@ -85,7 +86,7 @@ private[spark] class TaskSchedulerImpl(
val STARVATION_TIMEOUT_MS = conf.getTimeAsMs("spark.starvation.timeout", "15s")
// CPUs to request per task
val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)
val CPUS_PER_TASK = conf.get(config.CPUS_PER_TASK)
// TaskSetManagers are not thread safe, so any access to one should be synchronized
// on this class.
@ -131,7 +132,7 @@ private[spark] class TaskSchedulerImpl(
private var schedulableBuilder: SchedulableBuilder = null
// default scheduler is FIFO
private val schedulingModeConf = conf.get(SCHEDULER_MODE_PROPERTY, SchedulingMode.FIFO.toString)
private val schedulingModeConf = conf.get(SCHEDULER_MODE)
val schedulingMode: SchedulingMode =
try {
SchedulingMode.withName(schedulingModeConf.toUpperCase(Locale.ROOT))
@ -183,7 +184,7 @@ private[spark] class TaskSchedulerImpl(
override def start() {
backend.start()
if (!isLocal && conf.getBoolean("spark.speculation", false)) {
if (!isLocal && conf.get(SPECULATION_ENABLED)) {
logInfo("Starting speculative execution thread")
speculationScheduler.scheduleWithFixedDelay(new Runnable {
override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
@ -857,7 +858,7 @@ private[spark] class TaskSchedulerImpl(
private[spark] object TaskSchedulerImpl {
val SCHEDULER_MODE_PROPERTY = "spark.scheduler.mode"
val SCHEDULER_MODE_PROPERTY = SCHEDULER_MODE.key
/**
* Used to balance containers across hosts.

View file

@ -28,6 +28,7 @@ import scala.util.control.NonFatal
import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.config._
import org.apache.spark.scheduler.SchedulingMode._
import org.apache.spark.util.{AccumulatorV2, Clock, LongAccumulator, SystemClock, Utils}
import org.apache.spark.util.collection.MedianHeap
@ -61,12 +62,12 @@ private[spark] class TaskSetManager(
private val addedFiles = HashMap[String, Long](sched.sc.addedFiles.toSeq: _*)
// Quantile of tasks at which to start speculation
val SPECULATION_QUANTILE = conf.getDouble("spark.speculation.quantile", 0.75)
val SPECULATION_MULTIPLIER = conf.getDouble("spark.speculation.multiplier", 1.5)
val speculationQuantile = conf.get(SPECULATION_QUANTILE)
val speculationMultiplier = conf.get(SPECULATION_MULTIPLIER)
val maxResultSize = conf.get(config.MAX_RESULT_SIZE)
val speculationEnabled = conf.getBoolean("spark.speculation", false)
val speculationEnabled = conf.get(SPECULATION_ENABLED)
// Serializer for closures and tasks.
val env = SparkEnv.get
@ -1015,13 +1016,13 @@ private[spark] class TaskSetManager(
return false
}
var foundTasks = false
val minFinishedForSpeculation = (SPECULATION_QUANTILE * numTasks).floor.toInt
val minFinishedForSpeculation = (speculationQuantile * numTasks).floor.toInt
logDebug("Checking for speculative tasks: minFinished = " + minFinishedForSpeculation)
if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) {
val time = clock.getTimeMillis()
val medianDuration = successfulTaskDurations.median
val threshold = max(SPECULATION_MULTIPLIER * medianDuration, minTimeToSpeculation)
val threshold = max(speculationMultiplier * medianDuration, minTimeToSpeculation)
// TODO: Threshold should also look at standard deviation of task durations and have a lower
// bound based on that.
logDebug("Task length threshold for speculation: " + threshold)

View file

@ -30,6 +30,8 @@ import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, Tas
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Network._
import org.apache.spark.rpc._
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
@ -58,11 +60,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Submit tasks only after (registered resources / total expected resources)
// is equal to at least this value, that is double between 0 and 1.
private val _minRegisteredRatio =
math.min(1, conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0))
math.min(1, conf.get(SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO).getOrElse(0.0))
// Submit tasks after maxRegisteredWaitingTime milliseconds
// if minRegisteredRatio has not yet been reached
private val maxRegisteredWaitingTimeMs =
conf.getTimeAsMs("spark.scheduler.maxRegisteredResourcesWaitingTime", "30s")
conf.get(SCHEDULER_MAX_REGISTERED_RESOURCE_WAITING_TIME)
private val createTime = System.currentTimeMillis()
// Accessing `executorDataMap` in `DriverEndpoint.receive/receiveAndReply` doesn't need any
@ -118,7 +120,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
override def onStart() {
// Periodically revive offers to allow delay scheduling to work
val reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "1s")
val reviveIntervalMs = conf.get(SCHEDULER_REVIVE_INTERVAL).getOrElse(1000L)
reviveThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
@ -301,8 +303,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
Option(scheduler.taskIdToTaskSetManager.get(task.taskId)).foreach { taskSetMgr =>
try {
var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
"spark.rpc.message.maxSize (%d bytes). Consider increasing " +
"spark.rpc.message.maxSize or using broadcast variables for large values."
s"${RPC_MESSAGE_MAX_SIZE.key} (%d bytes). Consider increasing " +
s"${RPC_MESSAGE_MAX_SIZE.key} or using broadcast variables for large values."
msg = msg.format(task.taskId, task.index, serializedTask.limit(), maxRpcMessageSize)
taskSetMgr.abort(msg)
} catch {

View file

@ -25,8 +25,9 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap
import org.apache.spark._
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.CPUS_PER_TASK
import org.apache.spark.internal.config.Status._
import org.apache.spark.scheduler._
import org.apache.spark.status.api.v1
@ -151,7 +152,7 @@ private[spark] class AppStatusListener(
details.getOrElse("System Properties", Nil),
details.getOrElse("Classpath Entries", Nil))
coresPerTask = envInfo.sparkProperties.toMap.get("spark.task.cpus").map(_.toInt)
coresPerTask = envInfo.sparkProperties.toMap.get(CPUS_PER_TASK.key).map(_.toInt)
.getOrElse(coresPerTask)
kvstore.write(new ApplicationEnvironmentInfoWrapper(envInfo))
@ -434,7 +435,7 @@ private[spark] class AppStatusListener(
val stage = getOrCreateStage(event.stageInfo)
stage.status = v1.StageStatus.ACTIVE
stage.schedulingPool = Option(event.properties).flatMap { p =>
Option(p.getProperty("spark.scheduler.pool"))
Option(p.getProperty(SparkContext.SPARK_SCHEDULER_POOL))
}.getOrElse(SparkUI.DEFAULT_POOL_NAME)
// Look at all active jobs to find the ones that mention this stage.

View file

@ -36,7 +36,9 @@ import com.codahale.metrics.{MetricRegistry, MetricSet}
import org.apache.spark._
import org.apache.spark.executor.DataReadMethod
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config
import org.apache.spark.internal.config.Network
import org.apache.spark.memory.{MemoryManager, MemoryMode}
import org.apache.spark.metrics.source.Source
import org.apache.spark.network._
@ -133,7 +135,7 @@ private[spark] class BlockManager(
private[spark] val externalShuffleServiceEnabled =
conf.get(config.SHUFFLE_SERVICE_ENABLED)
private val remoteReadNioBufferConversion =
conf.getBoolean("spark.network.remoteReadNioBufferConversion", false)
conf.get(Network.NETWORK_REMOTE_READ_NIO_BUFFER_CONVERSION)
val diskBlockManager = {
// Only perform cleanup if an external service is not serving our shuffle files.

View file

@ -22,6 +22,7 @@ import java.util.concurrent.Semaphore
import scala.util.Random
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.config.SCHEDULER_MODE
import org.apache.spark.scheduler.SchedulingMode
// scalastyle:off
@ -50,14 +51,14 @@ private[spark] object UIWorkloadGenerator {
val schedulingMode = SchedulingMode.withName(args(1))
if (schedulingMode == SchedulingMode.FAIR) {
conf.set("spark.scheduler.mode", "FAIR")
conf.set(SCHEDULER_MODE, "FAIR")
}
val nJobSet = args(2).toInt
val sc = new SparkContext(conf)
def setProperties(s: String): Unit = {
if (schedulingMode == SchedulingMode.FAIR) {
sc.setLocalProperty("spark.scheduler.pool", s)
sc.setLocalProperty(SparkContext.SPARK_SCHEDULER_POOL, s)
}
sc.setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, s)
}

View file

@ -28,6 +28,7 @@ import scala.xml._
import org.apache.commons.lang3.StringEscapeUtils
import org.apache.spark.JobExecutionStatus
import org.apache.spark.internal.config.SCHEDULER_MODE
import org.apache.spark.scheduler._
import org.apache.spark.status.AppStatusStore
import org.apache.spark.status.api.v1
@ -295,7 +296,7 @@ private[ui] class AllJobsPage(parent: JobsTab, store: AppStatusStore) extends We
}
val schedulingMode = store.environmentInfo().sparkProperties.toMap
.get("spark.scheduler.mode")
.get(SCHEDULER_MODE.key)
.map { mode => SchedulingMode.withName(mode).toString }
.getOrElse("Unknown")

View file

@ -22,6 +22,7 @@ import javax.servlet.http.HttpServletRequest
import scala.collection.JavaConverters._
import org.apache.spark.JobExecutionStatus
import org.apache.spark.internal.config.SCHEDULER_MODE
import org.apache.spark.scheduler.SchedulingMode
import org.apache.spark.status.AppStatusStore
import org.apache.spark.ui._
@ -37,7 +38,7 @@ private[ui] class JobsTab(parent: SparkUI, store: AppStatusStore)
store
.environmentInfo()
.sparkProperties
.contains(("spark.scheduler.mode", SchedulingMode.FAIR.toString))
.contains((SCHEDULER_MODE.key, SchedulingMode.FAIR.toString))
}
def getSparkUser: String = parent.getSparkUser

View file

@ -19,6 +19,7 @@ package org.apache.spark.ui.jobs
import javax.servlet.http.HttpServletRequest
import org.apache.spark.internal.config.SCHEDULER_MODE
import org.apache.spark.scheduler.SchedulingMode
import org.apache.spark.status.AppStatusStore
import org.apache.spark.status.api.v1.StageStatus
@ -40,7 +41,7 @@ private[ui] class StagesTab(val parent: SparkUI, val store: AppStatusStore)
store
.environmentInfo()
.sparkProperties
.contains(("spark.scheduler.mode", SchedulingMode.FAIR.toString))
.contains((SCHEDULER_MODE.key, SchedulingMode.FAIR.toString))
}
def handleKillRequest(request: HttpServletRequest): Unit = {

View file

@ -19,6 +19,7 @@ package org.apache.spark.util
import org.apache.spark.SparkConf
import org.apache.spark.internal.config
import org.apache.spark.internal.config.Network._
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, RpcTimeout}
private[spark] object RpcUtils {
@ -35,32 +36,32 @@ private[spark] object RpcUtils {
/** Returns the configured number of times to retry connecting */
def numRetries(conf: SparkConf): Int = {
conf.getInt("spark.rpc.numRetries", 3)
conf.get(RPC_NUM_RETRIES)
}
/** Returns the configured number of milliseconds to wait on each retry */
def retryWaitMs(conf: SparkConf): Long = {
conf.getTimeAsMs("spark.rpc.retry.wait", "3s")
conf.get(RPC_RETRY_WAIT)
}
/** Returns the default Spark timeout to use for RPC ask operations. */
def askRpcTimeout(conf: SparkConf): RpcTimeout = {
RpcTimeout(conf, Seq("spark.rpc.askTimeout", "spark.network.timeout"), "120s")
RpcTimeout(conf, Seq(RPC_ASK_TIMEOUT.key, NETWORK_TIMEOUT.key), "120s")
}
/** Returns the default Spark timeout to use for RPC remote endpoint lookup. */
def lookupRpcTimeout(conf: SparkConf): RpcTimeout = {
RpcTimeout(conf, Seq("spark.rpc.lookupTimeout", "spark.network.timeout"), "120s")
RpcTimeout(conf, Seq(RPC_LOOKUP_TIMEOUT.key, NETWORK_TIMEOUT.key), "120s")
}
private val MAX_MESSAGE_SIZE_IN_MB = Int.MaxValue / 1024 / 1024
/** Returns the configured max message size for messages in bytes. */
def maxMessageSizeBytes(conf: SparkConf): Int = {
val maxSizeInMB = conf.getInt("spark.rpc.message.maxSize", 128)
val maxSizeInMB = conf.get(RPC_MESSAGE_MAX_SIZE)
if (maxSizeInMB > MAX_MESSAGE_SIZE_IN_MB) {
throw new IllegalArgumentException(
s"spark.rpc.message.maxSize should not be greater than $MAX_MESSAGE_SIZE_IN_MB MB")
s"${RPC_MESSAGE_MAX_SIZE.key} should not be greater than $MAX_MESSAGE_SIZE_IN_MB MB")
}
maxSizeInMB * 1024 * 1024
}

View file

@ -2465,9 +2465,9 @@ private[spark] object Utils extends Logging {
* Return whether dynamic allocation is enabled in the given conf.
*/
def isDynamicAllocationEnabled(conf: SparkConf): Boolean = {
val dynamicAllocationEnabled = conf.getBoolean("spark.dynamicAllocation.enabled", false)
val dynamicAllocationEnabled = conf.get(DYN_ALLOCATION_ENABLED)
dynamicAllocationEnabled &&
(!isLocalMaster(conf) || conf.getBoolean("spark.dynamicAllocation.testing", false))
(!isLocalMaster(conf) || conf.get(DYN_ALLOCATION_TESTING))
}
/**

View file

@ -20,9 +20,9 @@ package org.apache.spark
import scala.concurrent.duration._
import scala.language.postfixOps
import org.apache.spark.internal.config._
import org.apache.spark.rdd.{PartitionPruningRDD, RDD}
import org.apache.spark.scheduler.BarrierJobAllocationFailed._
import org.apache.spark.scheduler.DAGScheduler
import org.apache.spark.util.ThreadUtils
/**
@ -157,8 +157,8 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext
test("submit a barrier ResultStage with dynamic resource allocation enabled") {
val conf = new SparkConf()
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.dynamicAllocation.testing", "true")
.set(DYN_ALLOCATION_ENABLED, true)
.set(DYN_ALLOCATION_TESTING, true)
.setMaster("local[4]")
.setAppName("test")
sc = createSparkContext(Some(conf))
@ -172,8 +172,8 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext
test("submit a barrier ShuffleMapStage with dynamic resource allocation enabled") {
val conf = new SparkConf()
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.dynamicAllocation.testing", "true")
.set(DYN_ALLOCATION_ENABLED, true)
.set(DYN_ALLOCATION_TESTING, true)
.setMaster("local[4]")
.setAppName("test")
sc = createSparkContext(Some(conf))
@ -191,9 +191,9 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext
"mode") {
val conf = new SparkConf()
// Shorten the time interval between two failed checks to make the test fail faster.
.set("spark.scheduler.barrier.maxConcurrentTasksCheck.interval", "1s")
.set(BARRIER_MAX_CONCURRENT_TASKS_CHECK_INTERVAL.key, "1s")
// Reduce max check failures allowed to make the test fail faster.
.set("spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures", "3")
.set(BARRIER_MAX_CONCURRENT_TASKS_CHECK_MAX_FAILURES, 3)
.setMaster("local[4]")
.setAppName("test")
sc = createSparkContext(Some(conf))
@ -208,9 +208,9 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext
"local mode") {
val conf = new SparkConf()
// Shorten the time interval between two failed checks to make the test fail faster.
.set("spark.scheduler.barrier.maxConcurrentTasksCheck.interval", "1s")
.set(BARRIER_MAX_CONCURRENT_TASKS_CHECK_INTERVAL.key, "1s")
// Reduce max check failures allowed to make the test fail faster.
.set("spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures", "3")
.set(BARRIER_MAX_CONCURRENT_TASKS_CHECK_MAX_FAILURES, 3)
.setMaster("local[4]")
.setAppName("test")
sc = createSparkContext(Some(conf))
@ -226,11 +226,11 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext
test("submit a barrier ResultStage that requires more slots than current total under " +
"local-cluster mode") {
val conf = new SparkConf()
.set("spark.task.cpus", "2")
.set(CPUS_PER_TASK, 2)
// Shorten the time interval between two failed checks to make the test fail faster.
.set("spark.scheduler.barrier.maxConcurrentTasksCheck.interval", "1s")
.set(BARRIER_MAX_CONCURRENT_TASKS_CHECK_INTERVAL.key, "1s")
// Reduce max check failures allowed to make the test fail faster.
.set("spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures", "3")
.set(BARRIER_MAX_CONCURRENT_TASKS_CHECK_MAX_FAILURES, 3)
.setMaster("local-cluster[4, 3, 1024]")
.setAppName("test")
sc = createSparkContext(Some(conf))
@ -244,11 +244,11 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext
test("submit a barrier ShuffleMapStage that requires more slots than current total under " +
"local-cluster mode") {
val conf = new SparkConf()
.set("spark.task.cpus", "2")
.set(CPUS_PER_TASK, 2)
// Shorten the time interval between two failed checks to make the test fail faster.
.set("spark.scheduler.barrier.maxConcurrentTasksCheck.interval", "1s")
.set(BARRIER_MAX_CONCURRENT_TASKS_CHECK_INTERVAL.key, "1s")
// Reduce max check failures allowed to make the test fail faster.
.set("spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures", "3")
.set(BARRIER_MAX_CONCURRENT_TASKS_CHECK_MAX_FAILURES, 3)
.setMaster("local-cluster[4, 3, 1024]")
.setAppName("test")
sc = createSparkContext(Some(conf))

View file

@ -29,10 +29,10 @@ import org.scalatest.concurrent.PatienceConfiguration
import org.scalatest.time.SpanSugar._
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.config._
import org.apache.spark.rdd.{RDD, ReliableRDDCheckpointData}
import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.storage._
import org.apache.spark.util.Utils
/**
* An abstract base class for context cleaner tests, which sets up a context with a config
@ -46,9 +46,9 @@ abstract class ContextCleanerSuiteBase(val shuffleManager: Class[_] = classOf[So
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("ContextCleanerSuite")
.set("spark.cleaner.referenceTracking.blocking", "true")
.set("spark.cleaner.referenceTracking.blocking.shuffle", "true")
.set("spark.cleaner.referenceTracking.cleanCheckpoints", "true")
.set(CLEANER_REFERENCE_TRACKING_BLOCKING, true)
.set(CLEANER_REFERENCE_TRACKING_BLOCKING_SHUFFLE, true)
.set(CLEANER_REFERENCE_TRACKING_CLEAN_CHECKPOINTS, true)
.set(config.SHUFFLE_MANAGER, shuffleManager.getName)
before {
@ -234,7 +234,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("cleanupCheckpoint")
.set("spark.cleaner.referenceTracking.cleanCheckpoints", "false")
.set(CLEANER_REFERENCE_TRACKING_CLEAN_CHECKPOINTS, false)
sc = new SparkContext(conf)
rdd = newPairRDD()
sc.setCheckpointDir(checkpointDir.toString)
@ -317,8 +317,8 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
val conf2 = new SparkConf()
.setMaster("local-cluster[2, 1, 1024]")
.setAppName("ContextCleanerSuite")
.set("spark.cleaner.referenceTracking.blocking", "true")
.set("spark.cleaner.referenceTracking.blocking.shuffle", "true")
.set(CLEANER_REFERENCE_TRACKING_BLOCKING, true)
.set(CLEANER_REFERENCE_TRACKING_BLOCKING_SHUFFLE, true)
.set(config.SHUFFLE_MANAGER, shuffleManager.getName)
sc = new SparkContext(conf2)

View file

@ -63,19 +63,19 @@ class ExecutorAllocationManagerSuite
val conf = new SparkConf()
.setMaster("myDummyLocalExternalClusterManager")
.setAppName("test-executor-allocation-manager")
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.dynamicAllocation.testing", "true")
.set(config.DYN_ALLOCATION_ENABLED, true)
.set(config.DYN_ALLOCATION_TESTING, true)
val sc0 = new SparkContext(conf)
contexts += sc0
assert(sc0.executorAllocationManager.isDefined)
sc0.stop()
// Min < 0
val conf1 = conf.clone().set("spark.dynamicAllocation.minExecutors", "-1")
val conf1 = conf.clone().set(config.DYN_ALLOCATION_MIN_EXECUTORS, -1)
intercept[SparkException] { contexts += new SparkContext(conf1) }
// Max < 0
val conf2 = conf.clone().set("spark.dynamicAllocation.maxExecutors", "-1")
val conf2 = conf.clone().set(config.DYN_ALLOCATION_MAX_EXECUTORS, -1)
intercept[SparkException] { contexts += new SparkContext(conf2) }
// Both min and max, but min > max
@ -151,11 +151,11 @@ class ExecutorAllocationManagerSuite
val conf = new SparkConf()
.setMaster("myDummyLocalExternalClusterManager")
.setAppName("test-executor-allocation-manager")
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.dynamicAllocation.testing", "true")
.set("spark.dynamicAllocation.maxExecutors", "15")
.set("spark.dynamicAllocation.minExecutors", "3")
.set("spark.dynamicAllocation.executorAllocationRatio", divisor.toString)
.set(config.DYN_ALLOCATION_ENABLED, true)
.set(config.DYN_ALLOCATION_TESTING, true)
.set(config.DYN_ALLOCATION_MAX_EXECUTORS, 15)
.set(config.DYN_ALLOCATION_MIN_EXECUTORS, 3)
.set(config.DYN_ALLOCATION_EXECUTOR_ALLOCATION_RATIO, divisor)
.set(config.EXECUTOR_CORES, cores)
val sc = new SparkContext(conf)
contexts += sc
@ -1093,14 +1093,14 @@ class ExecutorAllocationManagerSuite
val initialExecutors = 1
val maxExecutors = 2
val conf = new SparkConf()
.set("spark.dynamicAllocation.enabled", "true")
.set(config.SHUFFLE_SERVICE_ENABLED.key, "true")
.set("spark.dynamicAllocation.minExecutors", minExecutors.toString)
.set("spark.dynamicAllocation.maxExecutors", maxExecutors.toString)
.set("spark.dynamicAllocation.initialExecutors", initialExecutors.toString)
.set("spark.dynamicAllocation.schedulerBacklogTimeout", "1000ms")
.set("spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", "1000ms")
.set("spark.dynamicAllocation.executorIdleTimeout", s"3000ms")
.set(config.DYN_ALLOCATION_ENABLED, true)
.set(config.SHUFFLE_SERVICE_ENABLED, true)
.set(config.DYN_ALLOCATION_MIN_EXECUTORS, minExecutors)
.set(config.DYN_ALLOCATION_MAX_EXECUTORS, maxExecutors)
.set(config.DYN_ALLOCATION_INITIAL_EXECUTORS, initialExecutors)
.set(config.DYN_ALLOCATION_SCHEDULER_BACKLOG_TIMEOUT.key, "1000ms")
.set(config.DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT.key, "1000ms")
.set(config.DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT.key, "3000ms")
val mockAllocationClient = mock(classOf[ExecutorAllocationClient])
val mockBMM = mock(classOf[BlockManagerMaster])
val manager = new ExecutorAllocationManager(
@ -1155,16 +1155,16 @@ class ExecutorAllocationManagerSuite
val conf = new SparkConf()
.setMaster("myDummyLocalExternalClusterManager")
.setAppName("test-executor-allocation-manager")
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.dynamicAllocation.minExecutors", minExecutors.toString)
.set("spark.dynamicAllocation.maxExecutors", maxExecutors.toString)
.set("spark.dynamicAllocation.initialExecutors", initialExecutors.toString)
.set("spark.dynamicAllocation.schedulerBacklogTimeout",
s"${schedulerBacklogTimeout.toString}s")
.set("spark.dynamicAllocation.sustainedSchedulerBacklogTimeout",
.set(config.DYN_ALLOCATION_ENABLED, true)
.set(config.DYN_ALLOCATION_MIN_EXECUTORS, minExecutors)
.set(config.DYN_ALLOCATION_MAX_EXECUTORS, maxExecutors)
.set(config.DYN_ALLOCATION_INITIAL_EXECUTORS, initialExecutors)
.set(config.DYN_ALLOCATION_SCHEDULER_BACKLOG_TIMEOUT.key,
s"${schedulerBacklogTimeout.toString}s")
.set(config.DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT.key,
s"${sustainedSchedulerBacklogTimeout.toString}s")
.set("spark.dynamicAllocation.executorIdleTimeout", s"${executorIdleTimeout.toString}s")
.set("spark.dynamicAllocation.testing", "true")
.set(config.DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT.key, s"${executorIdleTimeout.toString}s")
.set(config.DYN_ALLOCATION_TESTING, true)
// SPARK-22864: effectively disable the allocation schedule by setting the period to a
// really long value.
.set(TEST_SCHEDULE_INTERVAL, 10000L)

View file

@ -28,6 +28,7 @@ import org.mockito.Mockito.{mock, spy, verify, when}
import org.scalatest.{BeforeAndAfterEach, PrivateMethodTester}
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.internal.config.DYN_ALLOCATION_TESTING
import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv}
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
@ -67,7 +68,7 @@ class HeartbeatReceiverSuite
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("test")
.set("spark.dynamicAllocation.testing", "true")
.set(DYN_ALLOCATION_TESTING, true)
sc = spy(new SparkContext(conf))
scheduler = mock(classOf[TaskSchedulerImpl])
when(sc.taskScheduler).thenReturn(scheduler)

View file

@ -27,6 +27,7 @@ import scala.concurrent.duration._
import org.scalatest.BeforeAndAfter
import org.scalatest.Matchers
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Deploy._
import org.apache.spark.scheduler.{SparkListener, SparkListenerStageCompleted, SparkListenerTaskEnd, SparkListenerTaskStart}
import org.apache.spark.util.ThreadUtils
@ -52,7 +53,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
}
test("local mode, FIFO scheduler") {
val conf = new SparkConf().set("spark.scheduler.mode", "FIFO")
val conf = new SparkConf().set(SCHEDULER_MODE, "FIFO")
sc = new SparkContext("local[2]", "test", conf)
testCount()
testTake()
@ -61,9 +62,9 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
}
test("local mode, fair scheduler") {
val conf = new SparkConf().set("spark.scheduler.mode", "FAIR")
val conf = new SparkConf().set(SCHEDULER_MODE, "FAIR")
val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
conf.set("spark.scheduler.allocation.file", xmlPath)
conf.set(SCHEDULER_ALLOCATION_FILE, xmlPath)
sc = new SparkContext("local[2]", "test", conf)
testCount()
testTake()
@ -72,7 +73,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
}
test("cluster mode, FIFO scheduler") {
val conf = new SparkConf().set("spark.scheduler.mode", "FIFO")
val conf = new SparkConf().set(SCHEDULER_MODE, "FIFO")
sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
testCount()
testTake()
@ -81,9 +82,9 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
}
test("cluster mode, fair scheduler") {
val conf = new SparkConf().set("spark.scheduler.mode", "FAIR")
val conf = new SparkConf().set(SCHEDULER_MODE, "FAIR")
val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
conf.set("spark.scheduler.allocation.file", xmlPath)
conf.set(SCHEDULER_ALLOCATION_FILE, xmlPath)
sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
testCount()
testTake()
@ -217,8 +218,8 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
test("task reaper kills JVM if killed tasks keep running for too long") {
val conf = new SparkConf()
.set("spark.task.reaper.enabled", "true")
.set("spark.task.reaper.killTimeout", "5s")
.set(TASK_REAPER_ENABLED, true)
.set(TASK_REAPER_KILL_TIMEOUT.key, "5s")
sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
// Add a listener to release the semaphore once any tasks are launched.
@ -254,9 +255,9 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
test("task reaper will not kill JVM if spark.task.killTimeout == -1") {
val conf = new SparkConf()
.set("spark.task.reaper.enabled", "true")
.set("spark.task.reaper.killTimeout", "-1")
.set("spark.task.reaper.PollingInterval", "1s")
.set(TASK_REAPER_ENABLED, true)
.set(TASK_REAPER_KILL_TIMEOUT.key, "-1")
.set(TASK_REAPER_POLLING_INTERVAL.key, "1s")
.set(MAX_EXECUTOR_RETRIES, 1)
sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)

View file

@ -25,6 +25,7 @@ import org.mockito.Mockito._
import org.apache.spark.LocalSparkContext._
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Network.{RPC_ASK_TIMEOUT, RPC_MESSAGE_MAX_SIZE}
import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEnv}
import org.apache.spark.scheduler.{CompressedMapStatus, MapStatus}
import org.apache.spark.shuffle.FetchFailedException
@ -170,8 +171,8 @@ class MapOutputTrackerSuite extends SparkFunSuite {
test("remote fetch below max RPC message size") {
val newConf = new SparkConf
newConf.set("spark.rpc.message.maxSize", "1")
newConf.set("spark.rpc.askTimeout", "1") // Fail fast
newConf.set(RPC_MESSAGE_MAX_SIZE, 1)
newConf.set(RPC_ASK_TIMEOUT, "1") // Fail fast
newConf.set(SHUFFLE_MAPOUTPUT_MIN_SIZE_FOR_BROADCAST, 1048576L)
val masterTracker = newTrackerMaster(newConf)
@ -199,8 +200,8 @@ class MapOutputTrackerSuite extends SparkFunSuite {
test("min broadcast size exceeds max RPC message size") {
val newConf = new SparkConf
newConf.set("spark.rpc.message.maxSize", "1")
newConf.set("spark.rpc.askTimeout", "1") // Fail fast
newConf.set(RPC_MESSAGE_MAX_SIZE, 1)
newConf.set(RPC_ASK_TIMEOUT, "1") // Fail fast
newConf.set(SHUFFLE_MAPOUTPUT_MIN_SIZE_FOR_BROADCAST, Int.MaxValue.toLong)
intercept[IllegalArgumentException] { newTrackerMaster(newConf) }
@ -243,8 +244,8 @@ class MapOutputTrackerSuite extends SparkFunSuite {
test("remote fetch using broadcast") {
val newConf = new SparkConf
newConf.set("spark.rpc.message.maxSize", "1")
newConf.set("spark.rpc.askTimeout", "1") // Fail fast
newConf.set(RPC_MESSAGE_MAX_SIZE, 1)
newConf.set(RPC_ASK_TIMEOUT, "1") // Fail fast
newConf.set(SHUFFLE_MAPOUTPUT_MIN_SIZE_FOR_BROADCAST, 10240L) // 10 KiB << 1MiB framesize
// needs TorrentBroadcast so need a SparkContext

View file

@ -29,6 +29,7 @@ import com.esotericsoftware.kryo.Kryo
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.History._
import org.apache.spark.internal.config.Kryo._
import org.apache.spark.internal.config.Network._
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.serializer.{JavaSerializer, KryoRegistrator, KryoSerializer}
import org.apache.spark.util.{ResetSystemProperties, RpcUtils}
@ -142,7 +143,7 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
test("creating SparkContext with cpus per tasks bigger than cores per executors") {
val conf = new SparkConf(false)
.set(EXECUTOR_CORES, 1)
.set("spark.task.cpus", "2")
.set(CPUS_PER_TASK, 2)
intercept[SparkException] { sc = new SparkContext(conf) }
}
@ -268,10 +269,10 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
test("akka deprecated configs") {
val conf = new SparkConf()
assert(!conf.contains("spark.rpc.numRetries"))
assert(!conf.contains("spark.rpc.retry.wait"))
assert(!conf.contains("spark.rpc.askTimeout"))
assert(!conf.contains("spark.rpc.lookupTimeout"))
assert(!conf.contains(RPC_NUM_RETRIES))
assert(!conf.contains(RPC_RETRY_WAIT))
assert(!conf.contains(RPC_ASK_TIMEOUT))
assert(!conf.contains(RPC_LOOKUP_TIMEOUT))
conf.set("spark.akka.num.retries", "1")
assert(RpcUtils.numRetries(conf) === 1)
@ -322,12 +323,12 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
val conf = new SparkConf()
conf.validateSettings()
conf.set(NETWORK_ENCRYPTION_ENABLED, true)
conf.set(NETWORK_CRYPTO_ENABLED, true)
intercept[IllegalArgumentException] {
conf.validateSettings()
}
conf.set(NETWORK_ENCRYPTION_ENABLED, false)
conf.set(NETWORK_CRYPTO_ENABLED, false)
conf.set(SASL_ENCRYPTION_ENABLED, true)
intercept[IllegalArgumentException] {
conf.validateSettings()
@ -341,7 +342,7 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
val conf = new SparkConf()
conf.validateSettings()
conf.set("spark.network.timeout", "5s")
conf.set(NETWORK_TIMEOUT.key, "5s")
intercept[IllegalArgumentException] {
conf.validateSettings()
}

View file

@ -423,7 +423,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
test("No exception when both num-executors and dynamic allocation set.") {
noException should be thrownBy {
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")
.set("spark.dynamicAllocation.enabled", "true").set("spark.executor.instances", "6"))
.set(DYN_ALLOCATION_ENABLED, true).set("spark.executor.instances", "6"))
assert(sc.executorAllocationManager.isEmpty)
assert(sc.getConf.getInt("spark.executor.instances", 0) === 6)
}

View file

@ -195,7 +195,7 @@ class SparkSubmitSuite
"--name", "myApp",
"--class", "Foo",
"--num-executors", "0",
"--conf", "spark.dynamicAllocation.enabled=true",
"--conf", s"${DYN_ALLOCATION_ENABLED.key}=true",
"thejar.jar")
new SparkSubmitArguments(clArgs1)
@ -203,7 +203,7 @@ class SparkSubmitSuite
"--name", "myApp",
"--class", "Foo",
"--num-executors", "0",
"--conf", "spark.dynamicAllocation.enabled=false",
"--conf", s"${DYN_ALLOCATION_ENABLED.key}=false",
"thejar.jar")
val e = intercept[SparkException](new SparkSubmitArguments(clArgs2))

View file

@ -457,9 +457,9 @@ class StandaloneDynamicAllocationSuite
test("initial executor limit") {
val initialExecutorLimit = 1
val myConf = appConf
.set("spark.dynamicAllocation.enabled", "true")
.set(config.SHUFFLE_SERVICE_ENABLED.key, "true")
.set("spark.dynamicAllocation.initialExecutors", initialExecutorLimit.toString)
.set(config.DYN_ALLOCATION_ENABLED, true)
.set(config.SHUFFLE_SERVICE_ENABLED, true)
.set(config.DYN_ALLOCATION_INITIAL_EXECUTORS, initialExecutorLimit)
sc = new SparkContext(myConf)
val appId = sc.applicationId
eventually(timeout(10.seconds), interval(10.millis)) {

View file

@ -33,6 +33,7 @@ import org.scalatest.mockito.MockitoSugar
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Network
import org.apache.spark.network.{BlockDataManager, BlockTransferService}
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
import org.apache.spark.network.shuffle.BlockFetchingListener
@ -101,8 +102,8 @@ class NettyBlockTransferSecuritySuite extends SparkFunSuite with MockitoSugar wi
.set(NETWORK_AUTH_ENABLED, true)
.set(AUTH_SECRET, "good")
.set("spark.app.id", "app-id")
.set("spark.network.crypto.enabled", "true")
.set("spark.network.crypto.saslFallback", "false")
.set(Network.NETWORK_CRYPTO_ENABLED, true)
.set(Network.NETWORK_CRYPTO_SASL_FALLBACK, false)
testConnection(conf, conf) match {
case Success(_) => // expected
case Failure(t) => fail(t)

View file

@ -37,6 +37,7 @@ import org.scalatest.concurrent.Eventually._
import org.apache.spark.{SecurityManager, SparkConf, SparkEnv, SparkException, SparkFunSuite}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Network
import org.apache.spark.util.{ThreadUtils, Utils}
/**
@ -172,8 +173,8 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
val conf = new SparkConf()
val shortProp = "spark.rpc.short.timeout"
conf.set("spark.rpc.retry.wait", "0")
conf.set("spark.rpc.numRetries", "1")
conf.set(Network.RPC_RETRY_WAIT, 0L)
conf.set(Network.RPC_NUM_RETRIES, 1)
val anotherEnv = createRpcEnv(conf, "remote", 0, clientMode = true)
// Use anotherEnv to find out the RpcEndpointRef
val rpcEndpointRef = anotherEnv.setupEndpointRef(env.address, "ask-timeout")
@ -709,8 +710,8 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
testSend(new SparkConf()
.set(NETWORK_AUTH_ENABLED, true)
.set(AUTH_SECRET, "good")
.set("spark.network.crypto.enabled", "true")
.set("spark.network.crypto.saslFallback", "false"))
.set(Network.NETWORK_CRYPTO_ENABLED, true)
.set(Network.NETWORK_CRYPTO_SASL_FALLBACK, false))
}
test("ask with authentication") {
@ -730,8 +731,8 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
testAsk(new SparkConf()
.set(NETWORK_AUTH_ENABLED, true)
.set(AUTH_SECRET, "good")
.set("spark.network.crypto.enabled", "true")
.set("spark.network.crypto.saslFallback", "false"))
.set(Network.NETWORK_CRYPTO_ENABLED, true)
.set(Network.NETWORK_CRYPTO_SASL_FALLBACK, false))
}
test("construct RpcTimeout with conf property") {

View file

@ -58,7 +58,7 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM
"With default settings, job can succeed despite multiple bad executors on node",
extraConfs = Seq(
config.BLACKLIST_ENABLED.key -> "true",
config.MAX_TASK_FAILURES.key -> "4",
config.TASK_MAX_FAILURES.key -> "4",
TEST_N_HOSTS.key -> "2",
TEST_N_EXECUTORS_HOST.key -> "5",
TEST_N_CORES_EXECUTOR.key -> "10"
@ -106,7 +106,7 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM
TEST_N_HOSTS.key -> "2",
TEST_N_EXECUTORS_HOST.key -> "1",
TEST_N_CORES_EXECUTOR.key -> "1",
"spark.scheduler.blacklist.unschedulableTaskSetTimeout" -> "0s"
config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "0s"
)
) {
def runBackend(): Unit = {

View file

@ -443,20 +443,20 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
(2, 2),
(2, 3)
).foreach { case (maxTaskFailures, maxNodeAttempts) =>
conf.set(config.MAX_TASK_FAILURES, maxTaskFailures)
conf.set(config.TASK_MAX_FAILURES, maxTaskFailures)
conf.set(config.MAX_TASK_ATTEMPTS_PER_NODE.key, maxNodeAttempts.toString)
val excMsg = intercept[IllegalArgumentException] {
BlacklistTracker.validateBlacklistConfs(conf)
}.getMessage()
assert(excMsg === s"${config.MAX_TASK_ATTEMPTS_PER_NODE.key} " +
s"( = ${maxNodeAttempts}) was >= ${config.MAX_TASK_FAILURES.key} " +
s"( = ${maxNodeAttempts}) was >= ${config.TASK_MAX_FAILURES.key} " +
s"( = ${maxTaskFailures} ). Though blacklisting is enabled, with this configuration, " +
s"Spark will not be robust to one bad node. Decrease " +
s"${config.MAX_TASK_ATTEMPTS_PER_NODE.key}, increase ${config.MAX_TASK_FAILURES.key}, " +
s"${config.MAX_TASK_ATTEMPTS_PER_NODE.key}, increase ${config.TASK_MAX_FAILURES.key}, " +
s"or disable blacklisting with ${config.BLACKLIST_ENABLED.key}")
}
conf.remove(config.MAX_TASK_FAILURES)
conf.remove(config.TASK_MAX_FAILURES)
conf.remove(config.MAX_TASK_ATTEMPTS_PER_NODE)
Seq(

View file

@ -24,6 +24,8 @@ import scala.concurrent.duration._
import org.scalatest.concurrent.Eventually
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite}
import org.apache.spark.internal.config.CPUS_PER_TASK
import org.apache.spark.internal.config.Network.RPC_MESSAGE_MAX_SIZE
import org.apache.spark.rdd.RDD
import org.apache.spark.util.{RpcUtils, SerializableBuffer}
@ -34,7 +36,7 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo
test("serialized task larger than max RPC message size") {
val conf = new SparkConf
conf.set("spark.rpc.message.maxSize", "1")
conf.set(RPC_MESSAGE_MAX_SIZE, 1)
conf.set("spark.default.parallelism", "1")
sc = new SparkContext("local-cluster[2, 1, 1024]", "test", conf)
val frameSize = RpcUtils.maxMessageSizeBytes(sc.conf)
@ -62,7 +64,7 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo
test("compute max number of concurrent tasks can be launched when spark.task.cpus > 1") {
val conf = new SparkConf()
.set("spark.task.cpus", "2")
.set(CPUS_PER_TASK, 2)
.setMaster("local-cluster[4, 3, 1024]")
.setAppName("test")
sc = new SparkContext(conf)
@ -76,7 +78,7 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo
test("compute max number of concurrent tasks can be launched when some executors are busy") {
val conf = new SparkConf()
.set("spark.task.cpus", "2")
.set(CPUS_PER_TASK, 2)
.setMaster("local-cluster[4, 3, 1024]")
.setAppName("test")
sc = new SparkContext(conf)

View file

@ -21,6 +21,7 @@ import java.io.FileNotFoundException
import java.util.Properties
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.internal.config.SCHEDULER_ALLOCATION_FILE
import org.apache.spark.scheduler.SchedulingMode._
/**
@ -31,7 +32,6 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
val LOCAL = "local"
val APP_NAME = "PoolSuite"
val SCHEDULER_ALLOCATION_FILE_PROPERTY = "spark.scheduler.allocation.file"
val TEST_POOL = "testPool"
def createTaskSetManager(stageId: Int, numTasks: Int, taskScheduler: TaskSchedulerImpl)
@ -80,7 +80,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
*/
test("Fair Scheduler Test") {
val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE_PROPERTY, xmlPath)
val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE, xmlPath)
sc = new SparkContext(LOCAL, APP_NAME, conf)
val taskScheduler = new TaskSchedulerImpl(sc)
@ -182,7 +182,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
test("SPARK-17663: FairSchedulableBuilder sets default values for blank or invalid datas") {
val xmlPath = getClass.getClassLoader.getResource("fairscheduler-with-invalid-data.xml")
.getFile()
val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE_PROPERTY, xmlPath)
val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE, xmlPath)
val rootPool = new Pool("", FAIR, 0, 0)
val schedulableBuilder = new FairSchedulableBuilder(rootPool, conf)
@ -218,7 +218,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
val taskSetManager1 = createTaskSetManager(stageId = 1, numTasks = 1, taskScheduler)
val properties = new Properties()
properties.setProperty("spark.scheduler.pool", TEST_POOL)
properties.setProperty(SparkContext.SPARK_SCHEDULER_POOL, TEST_POOL)
// When FIFO Scheduler is used and task sets are submitted, they should be added to
// the root pool, and no additional pools should be created
@ -296,7 +296,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
test("Fair Scheduler should build fair scheduler when " +
"valid spark.scheduler.allocation.file property is set") {
val xmlPath = getClass.getClassLoader.getResource("fairscheduler-with-valid-data.xml").getFile()
val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE_PROPERTY, xmlPath)
val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE, xmlPath)
sc = new SparkContext(LOCAL, APP_NAME, conf)
val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
@ -326,7 +326,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
test("Fair Scheduler should throw FileNotFoundException " +
"when invalid spark.scheduler.allocation.file property is set") {
val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE_PROPERTY, "INVALID_FILE_PATH")
val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE, "INVALID_FILE_PATH")
sc = new SparkContext(LOCAL, APP_NAME, conf)
val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)

View file

@ -34,6 +34,7 @@ import org.scalatest.time.SpanSugar._
import org.apache.spark._
import org.apache.spark.TaskState._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.SCHEDULER_REVIVE_INTERVAL
import org.apache.spark.rdd.RDD
import org.apache.spark.util.{CallSite, ThreadUtils, Utils}
@ -290,7 +291,7 @@ private[spark] abstract class MockBackend(
// Periodically revive offers to allow delay scheduling to work
private val reviveThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread")
private val reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "10ms")
private val reviveIntervalMs = conf.get(SCHEDULER_REVIVE_INTERVAL).getOrElse(10L)
/**
* Test backends should call this to get a task that has been assigned to them by the scheduler.

View file

@ -29,6 +29,7 @@ import org.scalatest.Matchers
import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Network.RPC_MESSAGE_MAX_SIZE
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.{ResetSystemProperties, RpcUtils}
@ -358,7 +359,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
}
test("onTaskGettingResult() called when result fetched remotely") {
val conf = new SparkConf().set("spark.rpc.message.maxSize", "1")
val conf = new SparkConf().set(RPC_MESSAGE_MAX_SIZE, 1)
sc = new SparkContext("local", "SparkListenerSuite", conf)
val listener = new SaveTaskEvents
sc.addSparkListener(listener)

View file

@ -35,6 +35,7 @@ import org.scalatest.concurrent.Eventually._
import org.apache.spark._
import org.apache.spark.TestUtils.JavaSourceFromString
import org.apache.spark.internal.config.Network.RPC_MESSAGE_MAX_SIZE
import org.apache.spark.storage.TaskResultBlockId
import org.apache.spark.util.{MutableURLClassLoader, RpcUtils, Utils}
@ -110,7 +111,7 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local
// Set the RPC message size to be as small as possible (it must be an integer, so 1 is as small
// as we can make it) so the tests don't take too long.
def conf: SparkConf = new SparkConf().set("spark.rpc.message.maxSize", "1")
def conf: SparkConf = new SparkConf().set(RPC_MESSAGE_MAX_SIZE, 1)
test("handling results smaller than max RPC message size") {
sc = new SparkContext("local", "test", conf)

View file

@ -92,7 +92,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
sc = new SparkContext(conf)
taskScheduler =
new TaskSchedulerImpl(sc, sc.conf.getInt("spark.task.maxFailures", 4)) {
new TaskSchedulerImpl(sc, sc.conf.get(config.TASK_MAX_FAILURES)) {
override def createTaskSetManager(taskSet: TaskSet, maxFailures: Int): TaskSetManager = {
val tsm = super.createTaskSetManager(taskSet, maxFailures)
// we need to create a spied tsm just so we can set the TaskSetBlacklist
@ -155,7 +155,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
test("Scheduler correctly accounts for multiple CPUs per task") {
val taskCpus = 2
val taskScheduler = setupScheduler("spark.task.cpus" -> taskCpus.toString)
val taskScheduler = setupScheduler(config.CPUS_PER_TASK.key -> taskCpus.toString)
// Give zero core offers. Should not generate any tasks
val zeroCoreWorkerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 0),
new WorkerOffer("executor1", "host1", 0))
@ -185,7 +185,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
test("Scheduler does not crash when tasks are not serializable") {
val taskCpus = 2
val taskScheduler = setupScheduler("spark.task.cpus" -> taskCpus.toString)
val taskScheduler = setupScheduler(config.CPUS_PER_TASK.key -> taskCpus.toString)
val numFreeCores = 1
val taskSet = new TaskSet(
Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), 0, 0, 0, null)
@ -1208,7 +1208,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
test("don't schedule for a barrier taskSet if available slots are less than pending tasks") {
val taskCpus = 2
val taskScheduler = setupScheduler("spark.task.cpus" -> taskCpus.toString)
val taskScheduler = setupScheduler(config.CPUS_PER_TASK.key -> taskCpus.toString)
val numFreeCores = 3
val workerOffers = IndexedSeq(
@ -1225,7 +1225,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
test("schedule tasks for a barrier taskSet if all tasks can be launched together") {
val taskCpus = 2
val taskScheduler = setupScheduler("spark.task.cpus" -> taskCpus.toString)
val taskScheduler = setupScheduler(config.CPUS_PER_TASK.key -> taskCpus.toString)
val numFreeCores = 3
val workerOffers = IndexedSeq(

View file

@ -680,7 +680,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
}
test("[SPARK-13931] taskSetManager should not send Resubmitted tasks after being a zombie") {
val conf = new SparkConf().set("spark.speculation", "true")
val conf = new SparkConf().set(config.SPECULATION_ENABLED, true)
sc = new SparkContext("local", "test", conf)
sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2"))
@ -747,12 +747,12 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
test("[SPARK-22074] Task killed by other attempt task should not be resubmitted") {
val conf = new SparkConf().set("spark.speculation", "true")
val conf = new SparkConf().set(config.SPECULATION_ENABLED, true)
sc = new SparkContext("local", "test", conf)
// Set the speculation multiplier to be 0 so speculative tasks are launched immediately
sc.conf.set("spark.speculation.multiplier", "0.0")
sc.conf.set("spark.speculation.quantile", "0.5")
sc.conf.set("spark.speculation", "true")
sc.conf.set(config.SPECULATION_MULTIPLIER, 0.0)
sc.conf.set(config.SPECULATION_QUANTILE, 0.5)
sc.conf.set(config.SPECULATION_ENABLED, true)
var killTaskCalled = false
sched = new FakeTaskScheduler(sc, ("exec1", "host1"),
@ -1013,8 +1013,8 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
val taskSet = FakeTask.createTaskSet(4)
// Set the speculation multiplier to be 0 so speculative tasks are launched immediately
sc.conf.set("spark.speculation.multiplier", "0.0")
sc.conf.set("spark.speculation", "true")
sc.conf.set(config.SPECULATION_MULTIPLIER, 0.0)
sc.conf.set(config.SPECULATION_ENABLED, true)
val clock = new ManualClock()
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task =>
@ -1070,9 +1070,9 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
val taskSet = FakeTask.createTaskSet(5)
// Set the speculation multiplier to be 0 so speculative tasks are launched immediately
sc.conf.set("spark.speculation.multiplier", "0.0")
sc.conf.set("spark.speculation.quantile", "0.6")
sc.conf.set("spark.speculation", "true")
sc.conf.set(config.SPECULATION_MULTIPLIER, 0.0)
sc.conf.set(config.SPECULATION_QUANTILE, 0.6)
sc.conf.set(config.SPECULATION_ENABLED, true)
val clock = new ManualClock()
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task =>
@ -1366,12 +1366,12 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
}
test("[SPARK-24677] Avoid NoSuchElementException from MedianHeap") {
val conf = new SparkConf().set("spark.speculation", "true")
val conf = new SparkConf().set(config.SPECULATION_ENABLED, true)
sc = new SparkContext("local", "test", conf)
// Set the speculation multiplier to be 0 so speculative tasks are launched immediately
sc.conf.set("spark.speculation.multiplier", "0.0")
sc.conf.set("spark.speculation.quantile", "0.1")
sc.conf.set("spark.speculation", "true")
sc.conf.set(config.SPECULATION_MULTIPLIER, 0.0)
sc.conf.set(config.SPECULATION_QUANTILE, 0.1)
sc.conf.set(config.SPECULATION_ENABLED, true)
sched = new FakeTaskScheduler(sc)
sched.initialize(new FakeSchedulerBackend())
@ -1416,13 +1416,13 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
test("SPARK-24755 Executor loss can cause task to not be resubmitted") {
val conf = new SparkConf().set("spark.speculation", "true")
val conf = new SparkConf().set(config.SPECULATION_ENABLED, true)
sc = new SparkContext("local", "test", conf)
// Set the speculation multiplier to be 0 so speculative tasks are launched immediately
sc.conf.set("spark.speculation.multiplier", "0.0")
sc.conf.set(config.SPECULATION_MULTIPLIER, 0.0)
sc.conf.set("spark.speculation.quantile", "0.5")
sc.conf.set("spark.speculation", "true")
sc.conf.set(config.SPECULATION_QUANTILE, 0.5)
sc.conf.set(config.SPECULATION_ENABLED, true)
var killTaskCalled = false
sched = new FakeTaskScheduler(sc, ("exec1", "host1"),
@ -1538,8 +1538,8 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
val taskSet = FakeTask.createTaskSet(4)
// Set the speculation multiplier to be 0 so speculative tasks are launched immediately
sc.conf.set("spark.speculation.multiplier", "0.0")
sc.conf.set("spark.speculation", "true")
sc.conf.set(config.SPECULATION_MULTIPLIER, 0.0)
sc.conf.set(config.SPECULATION_ENABLED, true)
val clock = new ManualClock()
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task =>

View file

@ -30,7 +30,7 @@ class KryoSerializerDistributedSuite extends SparkFunSuite with LocalSparkContex
val conf = new SparkConf(false)
.set(config.SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
.set(config.Kryo.KRYO_USER_REGISTRATORS, classOf[AppJarRegistrator].getName)
.set(config.MAX_TASK_FAILURES, 1)
.set(config.TASK_MAX_FAILURES, 1)
.set(config.BLACKLIST_ENABLED, false)
val jar = TestUtils.createJarWithClasses(List(AppJarRegistrator.customClassName))

View file

@ -154,7 +154,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
val jobProps = new Properties()
jobProps.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "jobGroup")
jobProps.setProperty("spark.scheduler.pool", "schedPool")
jobProps.setProperty(SparkContext.SPARK_SCHEDULER_POOL, "schedPool")
listener.onJobStart(SparkListenerJobStart(1, time, stages, jobProps))

View file

@ -833,32 +833,32 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
conf.set(SUBMIT_DEPLOY_MODE, "client")
assert(Utils.isDynamicAllocationEnabled(conf) === false)
assert(Utils.isDynamicAllocationEnabled(
conf.set("spark.dynamicAllocation.enabled", "false")) === false)
conf.set(DYN_ALLOCATION_ENABLED, false)) === false)
assert(Utils.isDynamicAllocationEnabled(
conf.set("spark.dynamicAllocation.enabled", "true")) === true)
conf.set(DYN_ALLOCATION_ENABLED, true)) === true)
assert(Utils.isDynamicAllocationEnabled(
conf.set("spark.executor.instances", "1")) === true)
assert(Utils.isDynamicAllocationEnabled(
conf.set("spark.executor.instances", "0")) === true)
assert(Utils.isDynamicAllocationEnabled(conf.set("spark.master", "local")) === false)
assert(Utils.isDynamicAllocationEnabled(conf.set("spark.dynamicAllocation.testing", "true")))
assert(Utils.isDynamicAllocationEnabled(conf.set(DYN_ALLOCATION_TESTING, true)))
}
test("getDynamicAllocationInitialExecutors") {
val conf = new SparkConf()
assert(Utils.getDynamicAllocationInitialExecutors(conf) === 0)
assert(Utils.getDynamicAllocationInitialExecutors(
conf.set("spark.dynamicAllocation.minExecutors", "3")) === 3)
conf.set(DYN_ALLOCATION_MIN_EXECUTORS, 3)) === 3)
assert(Utils.getDynamicAllocationInitialExecutors( // should use minExecutors
conf.set("spark.executor.instances", "2")) === 3)
assert(Utils.getDynamicAllocationInitialExecutors( // should use executor.instances
conf.set("spark.executor.instances", "4")) === 4)
assert(Utils.getDynamicAllocationInitialExecutors( // should use executor.instances
conf.set("spark.dynamicAllocation.initialExecutors", "3")) === 4)
conf.set(DYN_ALLOCATION_INITIAL_EXECUTORS, 3)) === 4)
assert(Utils.getDynamicAllocationInitialExecutors( // should use initialExecutors
conf.set("spark.dynamicAllocation.initialExecutors", "5")) === 5)
conf.set(DYN_ALLOCATION_INITIAL_EXECUTORS, 5)) === 5)
assert(Utils.getDynamicAllocationInitialExecutors( // should use minExecutors
conf.set("spark.dynamicAllocation.initialExecutors", "2")
conf.set(DYN_ALLOCATION_INITIAL_EXECUTORS, 2)
.set("spark.executor.instances", "1")) === 3)
}

View file

@ -20,6 +20,7 @@ package org.apache.spark.mllib.util
import org.scalatest.{BeforeAndAfterAll, Suite}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.config.Network.RPC_MESSAGE_MAX_SIZE
trait LocalClusterSparkContext extends BeforeAndAfterAll { self: Suite =>
@transient var sc: SparkContext = _
@ -29,7 +30,7 @@ trait LocalClusterSparkContext extends BeforeAndAfterAll { self: Suite =>
val conf = new SparkConf()
.setMaster("local-cluster[2, 1, 1024]")
.setAppName("test-cluster")
.set("spark.rpc.message.maxSize", "1") // set to 1MB to detect direct serialization of data
.set(RPC_MESSAGE_MAX_SIZE, 1) // set to 1MB to detect direct serialization of data
sc = new SparkContext(conf)
}

View file

@ -26,6 +26,7 @@ import org.apache.spark.SparkContext
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.internal.config.SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv}
import org.apache.spark.scheduler.{ExecutorLossReason, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils}
@ -47,7 +48,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
ExecutionContext.fromExecutorService(requestExecutorsService)
protected override val minRegisteredRatio =
if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
if (conf.get(SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO).isEmpty) {
0.8
} else {
super.minRegisteredRatio

View file

@ -630,8 +630,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
.registerDriverWithShuffleService(
slave.hostname,
externalShufflePort,
sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs",
s"${sc.conf.getTimeAsSeconds("spark.network.timeout", "120s")}s"),
sc.conf.get(config.STORAGE_BLOCKMANAGER_SLAVE_TIMEOUT),
sc.conf.get(config.EXECUTOR_HEARTBEAT_INTERVAL))
slave.shuffleRegistered = true
}

View file

@ -591,7 +591,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
val expectedCores = 1
setBackend(Map(
"spark.cores.max" -> expectedCores.toString,
"spark.scheduler.minRegisteredResourcesRatio" -> "1.0"))
SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO.key -> "1.0"))
val offers = List(Resources(backend.executorMemory(sc), expectedCores))
offerResources(offers)
@ -604,8 +604,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
test("supports data locality with dynamic allocation") {
setBackend(Map(
"spark.dynamicAllocation.enabled" -> "true",
"spark.dynamicAllocation.testing" -> "true",
DYN_ALLOCATION_ENABLED.key -> "true",
DYN_ALLOCATION_TESTING.key -> "true",
"spark.locality.wait" -> "1s"))
assert(backend.getExecutorIds().isEmpty)

View file

@ -33,6 +33,7 @@ import org.apache.spark.SparkContext
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.deploy.yarn.security.YARNHadoopDelegationTokenManager
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config
import org.apache.spark.internal.config.UI._
import org.apache.spark.rpc._
import org.apache.spark.scheduler._
@ -52,7 +53,7 @@ private[spark] abstract class YarnSchedulerBackend(
private val stopped = new AtomicBoolean(false)
override val minRegisteredRatio =
if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
if (conf.get(config.SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO).isEmpty) {
0.8
} else {
super.minRegisteredRatio

View file

@ -29,6 +29,7 @@ import org.apache.spark._
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Network._
import org.apache.spark.network.shuffle.ShuffleTestAccessor
import org.apache.spark.network.yarn.{YarnShuffleService, YarnTestAccessor}
import org.apache.spark.tags.ExtendedYarnTest
@ -94,14 +95,14 @@ class YarnShuffleAuthSuite extends YarnShuffleIntegrationSuite {
override def newYarnConfig(): YarnConfiguration = {
val yarnConfig = super.newYarnConfig()
yarnConfig.set(NETWORK_AUTH_ENABLED.key, "true")
yarnConfig.set(NETWORK_ENCRYPTION_ENABLED.key, "true")
yarnConfig.set(NETWORK_CRYPTO_ENABLED.key, "true")
yarnConfig
}
override protected def extraSparkConf(): Map[String, String] = {
super.extraSparkConf() ++ Map(
NETWORK_AUTH_ENABLED.key -> "true",
NETWORK_ENCRYPTION_ENABLED.key -> "true"
NETWORK_CRYPTO_ENABLED.key -> "true"
)
}

View file

@ -63,7 +63,7 @@ class RuntimeConfigSuite extends SparkFunSuite {
assert(!conf.isModifiable("spark.sql.sources.schemaStringLengthThreshold"))
assert(conf.isModifiable("spark.sql.streaming.checkpointLocation"))
// Core configs
assert(!conf.isModifiable("spark.task.cpus"))
assert(!conf.isModifiable(config.CPUS_PER_TASK.key))
assert(!conf.isModifiable("spark.executor.cores"))
// Invalid config parameters
assert(!conf.isModifiable(""))

View file

@ -35,6 +35,7 @@ import org.scalatest.time.SpanSugar._
import org.apache.spark._
import org.apache.spark.LocalSparkContext._
import org.apache.spark.internal.config.Network.RPC_NUM_RETRIES
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.util.quietly
@ -393,7 +394,7 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
.set(StateStore.MAINTENANCE_INTERVAL_CONFIG, "10ms")
// Make sure that when SparkContext stops, the StateStore maintenance thread 'quickly'
// fails to talk to the StateStoreCoordinator and unloads all the StateStores
.set("spark.rpc.numRetries", "1")
.set(RPC_NUM_RETRIES, 1)
val opId = 0
val dir = newDir()
val storeProviderId = StateStoreProviderId(StateStoreId(dir, opId, 0), UUID.randomUUID)

View file

@ -603,7 +603,7 @@ class SQLAppStatusListenerMemoryLeakSuite extends SparkFunSuite {
val conf = new SparkConf()
.setMaster("local")
.setAppName("test")
.set(config.MAX_TASK_FAILURES, 1) // Don't retry the tasks to run this test quickly
.set(config.TASK_MAX_FAILURES, 1) // Don't retry the tasks to run this test quickly
.set("spark.sql.ui.retainedExecutions", "50") // Set it to 50 to run this test quickly
.set(ASYNC_TRACKING_ENABLED, false)
withSpark(new SparkContext(conf)) { sc =>

View file

@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.SparkContext
import org.apache.spark.internal.config.SPECULATION_ENABLED
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources.v2.reader._

View file

@ -32,6 +32,7 @@ import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.ExecuteStatementOperation
import org.apache.hive.service.cli.session.HiveSession
import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, Row => SparkRow, SQLContext}
import org.apache.spark.sql.execution.HiveResult
@ -226,7 +227,7 @@ private[hive] class SparkExecuteStatementOperation(
sqlContext.sparkContext.setJobGroup(statementId, statement)
val pool = sessionToActivePool.get(parentSession.getSessionHandle)
if (pool != null) {
sqlContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
sqlContext.sparkContext.setLocalProperty(SparkContext.SPARK_SCHEDULER_POOL, pool)
}
try {
result = sqlContext.sql(statement)
@ -234,7 +235,8 @@ private[hive] class SparkExecuteStatementOperation(
result.queryExecution.logical match {
case SetCommand(Some((SQLConf.THRIFTSERVER_POOL.key, Some(value)))) =>
sessionToActivePool.put(parentSession.getSessionHandle, value)
logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.")
logInfo(s"Setting ${SparkContext.SPARK_SCHEDULER_POOL}=$value for future statements " +
"in this session.")
case _ =>
}
HiveThriftServer2.listener.onStatementParsed(statementId, result.queryExecution.toString())

View file

@ -30,6 +30,7 @@ import org.apache.hadoop.mapred.{JobConf, Reporter}
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.SPECULATION_ENABLED
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriter, OutputWriterFactory}
@ -69,7 +70,7 @@ class HiveFileFormat(fileSinkConf: FileSinkDesc)
// When speculation is on and output committer class name contains "Direct", we should warn
// users that they may loss data if they are using a direct output committer.
val speculationEnabled = sparkSession.sparkContext.conf.getBoolean("spark.speculation", false)
val speculationEnabled = sparkSession.sparkContext.conf.get(SPECULATION_ENABLED)
val outputCommitterClass = conf.get("mapred.output.committer.class", "")
if (speculationEnabled && outputCommitterClass.contains("Direct")) {
val warningMessage =

View file

@ -25,6 +25,7 @@ import org.scalatest.mockito.MockitoSugar
import org.scalatest.time.SpanSugar._
import org.apache.spark.{ExecutorAllocationClient, SparkConf, SparkFunSuite}
import org.apache.spark.internal.config.{DYN_ALLOCATION_ENABLED, DYN_ALLOCATION_TESTING}
import org.apache.spark.streaming.{DummyInputDStream, Seconds, StreamingContext}
import org.apache.spark.util.{ManualClock, Utils}
@ -332,8 +333,8 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite
val confWithBothDynamicAllocationEnabled = new SparkConf()
.set("spark.streaming.dynamicAllocation.enabled", "true")
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.dynamicAllocation.testing", "true")
.set(DYN_ALLOCATION_ENABLED, true)
.set(DYN_ALLOCATION_TESTING, true)
require(Utils.isDynamicAllocationEnabled(confWithBothDynamicAllocationEnabled) === true)
withStreamingContext(confWithBothDynamicAllocationEnabled) { ssc =>
intercept[IllegalArgumentException] {