[SPARK-26466][CORE] Use ConfigEntry for hardcoded configs for submit categories.

## What changes were proposed in this pull request?

The PR makes hardcoded configs below to use `ConfigEntry`.

* spark.kryo
* spark.kryoserializer
* spark.serializer
* spark.jars
* spark.files
* spark.submit
* spark.deploy
* spark.worker

This patch doesn't change configs which are not relevant to SparkConf (e.g. system properties).

## How was this patch tested?

Existing tests.

Closes #23532 from HeartSaVioR/SPARK-26466-v2.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
This commit is contained in:
Jungtaek Lim (HeartSaVioR) 2019-01-16 20:57:21 -06:00 committed by Sean Owen
parent 272428db6f
commit 38f030725c
69 changed files with 528 additions and 280 deletions

View file

@ -28,6 +28,7 @@ import org.apache.avro.{Schema, SchemaNormalization}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.History._
import org.apache.spark.internal.config.Kryo._
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.util.Utils
@ -123,7 +124,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
/** Set JAR files to distribute to the cluster. */
def setJars(jars: Seq[String]): SparkConf = {
for (jar <- jars if (jar == null)) logWarning("null jar passed to SparkContext constructor")
set("spark.jars", jars.filter(_ != null).mkString(","))
set(JARS, jars.filter(_ != null))
}
/** Set JAR files to distribute to the cluster. (Java-friendly version.) */
@ -201,12 +202,12 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
*/
def registerKryoClasses(classes: Array[Class[_]]): SparkConf = {
val allClassNames = new LinkedHashSet[String]()
allClassNames ++= get("spark.kryo.classesToRegister", "").split(',').map(_.trim)
allClassNames ++= get(KRYO_CLASSES_TO_REGISTER).map(_.trim)
.filter(!_.isEmpty)
allClassNames ++= classes.map(_.getName)
set("spark.kryo.classesToRegister", allClassNames.mkString(","))
set("spark.serializer", classOf[KryoSerializer].getName)
set(KRYO_CLASSES_TO_REGISTER, allClassNames.toSeq)
set(SERIALIZER, classOf[KryoSerializer].getName)
this
}
@ -547,20 +548,20 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
case "yarn-cluster" =>
logWarning(warning)
set("spark.master", "yarn")
set("spark.submit.deployMode", "cluster")
set(SUBMIT_DEPLOY_MODE, "cluster")
case "yarn-client" =>
logWarning(warning)
set("spark.master", "yarn")
set("spark.submit.deployMode", "client")
set(SUBMIT_DEPLOY_MODE, "client")
case _ => // Any other unexpected master will be checked when creating scheduler backend.
}
}
if (contains("spark.submit.deployMode")) {
get("spark.submit.deployMode") match {
if (contains(SUBMIT_DEPLOY_MODE)) {
get(SUBMIT_DEPLOY_MODE) match {
case "cluster" | "client" =>
case e => throw new SparkException("spark.submit.deployMode can only be \"cluster\" or " +
"\"client\".")
case e => throw new SparkException(s"${SUBMIT_DEPLOY_MODE.key} can only be " +
"\"cluster\" or \"client\".")
}
}

View file

@ -229,7 +229,7 @@ class SparkContext(config: SparkConf) extends Logging {
def jars: Seq[String] = _jars
def files: Seq[String] = _files
def master: String = _conf.get("spark.master")
def deployMode: String = _conf.getOption("spark.submit.deployMode").getOrElse("client")
def deployMode: String = _conf.get(SUBMIT_DEPLOY_MODE)
def appName: String = _conf.get("spark.app.name")
private[spark] def isEventLogEnabled: Boolean = _conf.get(EVENT_LOG_ENABLED)
@ -2640,7 +2640,7 @@ object SparkContext extends Logging {
case SparkMasterRegex.LOCAL_N_REGEX(threads) => convertToInt(threads)
case SparkMasterRegex.LOCAL_N_FAILURES_REGEX(threads, _) => convertToInt(threads)
case "yarn" =>
if (conf != null && conf.getOption("spark.submit.deployMode").contains("cluster")) {
if (conf != null && conf.get(SUBMIT_DEPLOY_MODE) == "cluster") {
conf.getInt(DRIVER_CORES.key, 0)
} else {
0

View file

@ -274,14 +274,13 @@ object SparkEnv extends Logging {
}
}
// Create an instance of the class named by the given SparkConf property, or defaultClassName
// Create an instance of the class named by the given SparkConf property
// if the property is not set, possibly initializing it with our conf
def instantiateClassFromConf[T](propertyName: String, defaultClassName: String): T = {
instantiateClass[T](conf.get(propertyName, defaultClassName))
def instantiateClassFromConf[T](propertyName: ConfigEntry[String]): T = {
instantiateClass[T](conf.get(propertyName))
}
val serializer = instantiateClassFromConf[Serializer](
"spark.serializer", "org.apache.spark.serializer.JavaSerializer")
val serializer = instantiateClassFromConf[Serializer](SERIALIZER)
logDebug(s"Using serializer: ${serializer.getClass}")
val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)

View file

@ -23,6 +23,7 @@ import java.util.Arrays
import org.apache.spark.{SparkEnv, SparkException}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.api.python.PythonUtils
import org.apache.spark.internal.config._
private[spark] object RUtils {
// Local path where R binary packages built from R source code contained in the spark
@ -63,7 +64,7 @@ private[spark] object RUtils {
(sys.props("spark.master"), sys.props("spark.submit.deployMode"))
} else {
val sparkConf = SparkEnv.get.conf
(sparkConf.get("spark.master"), sparkConf.get("spark.submit.deployMode", "client"))
(sparkConf.get("spark.master"), sparkConf.get(SUBMIT_DEPLOY_MODE))
}
val isYarnCluster = master != null && master.contains("yarn") && deployMode == "cluster"

View file

@ -60,7 +60,7 @@ import org.apache.spark.util.{ThreadUtils, Utils}
private object FaultToleranceTest extends App with Logging {
private val conf = new SparkConf()
private val ZK_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark")
private val zkDir = conf.get(config.Deploy.ZOOKEEPER_DIRECTORY).getOrElse("/spark")
private val masters = ListBuffer[TestMasterInfo]()
private val workers = ListBuffer[TestWorkerInfo]()
@ -87,8 +87,8 @@ private object FaultToleranceTest extends App with Logging {
terminateCluster()
// Clear ZK directories in between tests (for speed purposes)
SparkCuratorUtil.deleteRecursive(zk, ZK_DIR + "/spark_leader")
SparkCuratorUtil.deleteRecursive(zk, ZK_DIR + "/master_status")
SparkCuratorUtil.deleteRecursive(zk, zkDir + "/spark_leader")
SparkCuratorUtil.deleteRecursive(zk, zkDir + "/master_status")
}
test("sanity-basic") {

View file

@ -25,6 +25,7 @@ import org.apache.zookeeper.KeeperException
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Deploy.ZOOKEEPER_URL
private[spark] object SparkCuratorUtil extends Logging {
@ -35,7 +36,7 @@ private[spark] object SparkCuratorUtil extends Logging {
def newClient(
conf: SparkConf,
zkUrlConf: String = "spark.deploy.zookeeper.url"): CuratorFramework = {
zkUrlConf: String = ZOOKEEPER_URL.key): CuratorFramework = {
val ZK_URL = conf.get(zkUrlConf)
val zk = CuratorFrameworkFactory.newClient(ZK_URL,
ZK_SESSION_TIMEOUT_MILLIS, ZK_CONNECTION_TIMEOUT_MILLIS,

View file

@ -437,7 +437,7 @@ private[spark] class SparkSubmit extends Logging {
}
if (localPyFiles != null) {
sparkConf.set("spark.submit.pyFiles", localPyFiles)
sparkConf.set(SUBMIT_PYTHON_FILES, localPyFiles.split(",").toSeq)
}
// In YARN mode for an R app, add the SparkR package archive and the R package
@ -614,11 +614,11 @@ private[spark] class SparkSubmit extends Logging {
// For YARN cluster mode, the jar is already distributed on each node as "app.jar"
// For python and R files, the primary resource is already distributed as a regular file
if (!isYarnCluster && !args.isPython && !args.isR) {
var jars = sparkConf.getOption("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq.empty)
var jars = sparkConf.get(JARS)
if (isUserJar(args.primaryResource)) {
jars = jars ++ Seq(args.primaryResource)
}
sparkConf.set("spark.jars", jars.mkString(","))
sparkConf.set(JARS, jars)
}
// In standalone cluster mode, use the REST client to submit the application (Spark 1.3+).
@ -681,7 +681,7 @@ private[spark] class SparkSubmit extends Logging {
// Second argument is main class
childArgs += (args.primaryResource, "")
if (args.pyFiles != null) {
sparkConf.set("spark.submit.pyFiles", args.pyFiles)
sparkConf.set(SUBMIT_PYTHON_FILES, args.pyFiles.split(",").toSeq)
}
} else if (args.isR) {
// Second argument is main class
@ -748,18 +748,17 @@ private[spark] class SparkSubmit extends Logging {
// Resolve and format python file paths properly before adding them to the PYTHONPATH.
// The resolving part is redundant in the case of --py-files, but necessary if the user
// explicitly sets `spark.submit.pyFiles` in his/her default properties file.
sparkConf.getOption("spark.submit.pyFiles").foreach { pyFiles =>
val resolvedPyFiles = Utils.resolveURIs(pyFiles)
val formattedPyFiles = if (!isYarnCluster && !isMesosCluster) {
PythonRunner.formatPaths(resolvedPyFiles).mkString(",")
} else {
// Ignoring formatting python path in yarn and mesos cluster mode, these two modes
// support dealing with remote python files, they could distribute and add python files
// locally.
resolvedPyFiles
}
sparkConf.set("spark.submit.pyFiles", formattedPyFiles)
val pyFiles = sparkConf.get(SUBMIT_PYTHON_FILES)
val resolvedPyFiles = Utils.resolveURIs(pyFiles.mkString(","))
val formattedPyFiles = if (!isYarnCluster && !isMesosCluster) {
PythonRunner.formatPaths(resolvedPyFiles).mkString(",")
} else {
// Ignoring formatting python path in yarn and mesos cluster mode, these two modes
// support dealing with remote python files, they could distribute and add python files
// locally.
resolvedPyFiles
}
sparkConf.set(SUBMIT_PYTHON_FILES, formattedPyFiles.split(",").toSeq)
(childArgs, childClasspath, sparkConf, childMainClass)
}

View file

@ -34,7 +34,9 @@ import org.apache.spark.deploy.master.ui.MasterWebUI
import org.apache.spark.deploy.rest.StandaloneRestServer
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Deploy._
import org.apache.spark.internal.config.UI._
import org.apache.spark.internal.config.Worker._
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.rpc._
import org.apache.spark.serializer.{JavaSerializer, Serializer}
@ -56,12 +58,12 @@ private[deploy] class Master(
// For application IDs
private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
private val WORKER_TIMEOUT_MS = conf.getLong("spark.worker.timeout", 60) * 1000
private val RETAINED_APPLICATIONS = conf.getInt("spark.deploy.retainedApplications", 200)
private val RETAINED_DRIVERS = conf.getInt("spark.deploy.retainedDrivers", 200)
private val REAPER_ITERATIONS = conf.getInt("spark.dead.worker.persistence", 15)
private val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode", "NONE")
private val MAX_EXECUTOR_RETRIES = conf.getInt("spark.deploy.maxExecutorRetries", 10)
private val workerTimeoutMs = conf.get(WORKER_TIMEOUT) * 1000
private val retainedApplications = conf.get(RETAINED_APPLICATIONS)
private val retainedDrivers = conf.get(RETAINED_DRIVERS)
private val reaperIterations = conf.get(REAPER_ITERATIONS)
private val recoveryMode = conf.get(RECOVERY_MODE)
private val maxExecutorRetries = conf.get(MAX_EXECUTOR_RETRIES)
val workers = new HashSet[WorkerInfo]
val idToApp = new HashMap[String, ApplicationInfo]
@ -113,13 +115,13 @@ private[deploy] class Master(
// As a temporary workaround before better ways of configuring memory, we allow users to set
// a flag that will perform round-robin scheduling across the nodes (spreading out each app
// among all the nodes) instead of trying to consolidate each app onto a small # of nodes.
private val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true)
private val spreadOutApps = conf.get(SPREAD_OUT_APPS)
// Default maxCores for applications that don't specify it (i.e. pass Int.MaxValue)
private val defaultCores = conf.getInt("spark.deploy.defaultCores", Int.MaxValue)
private val defaultCores = conf.get(DEFAULT_CORES)
val reverseProxy = conf.get(UI_REVERSE_PROXY)
if (defaultCores < 1) {
throw new SparkException("spark.deploy.defaultCores must be positive")
throw new SparkException(s"${DEFAULT_CORES.key} must be positive")
}
// Alternative application submission gateway that is stable across Spark versions
@ -151,7 +153,7 @@ private[deploy] class Master(
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CheckForWorkerTimeOut)
}
}, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
}, 0, workerTimeoutMs, TimeUnit.MILLISECONDS)
if (restServerEnabled) {
val port = conf.get(MASTER_REST_SERVER_PORT)
@ -168,7 +170,7 @@ private[deploy] class Master(
applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
val serializer = new JavaSerializer(conf)
val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
val (persistenceEngine_, leaderElectionAgent_) = recoveryMode match {
case "ZOOKEEPER" =>
logInfo("Persisting recovery state to ZooKeeper")
val zkFactory =
@ -179,7 +181,7 @@ private[deploy] class Master(
new FileSystemRecoveryModeFactory(conf, serializer)
(fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
case "CUSTOM" =>
val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
val clazz = Utils.classForName(conf.get(RECOVERY_MODE_FACTORY))
val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
.newInstance(conf, serializer)
.asInstanceOf[StandaloneRecoveryModeFactory]
@ -233,7 +235,7 @@ private[deploy] class Master(
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CompleteRecovery)
}
}, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
}, workerTimeoutMs, TimeUnit.MILLISECONDS)
}
case CompleteRecovery => completeRecovery()
@ -311,8 +313,8 @@ private[deploy] class Master(
// Important note: this code path is not exercised by tests, so be very careful when
// changing this `if` condition.
if (!normalExit
&& appInfo.incrementRetryCount() >= MAX_EXECUTOR_RETRIES
&& MAX_EXECUTOR_RETRIES >= 0) { // < 0 disables this application-killing path
&& appInfo.incrementRetryCount() >= maxExecutorRetries
&& maxExecutorRetries >= 0) { // < 0 disables this application-killing path
val execs = appInfo.executors.values
if (!execs.exists(_.state == ExecutorState.RUNNING)) {
logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +
@ -870,8 +872,8 @@ private[deploy] class Master(
endpointToApp -= app.driver
addressToApp -= app.driver.address
if (completedApps.size >= RETAINED_APPLICATIONS) {
val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
if (completedApps.size >= retainedApplications) {
val toRemove = math.max(retainedApplications / 10, 1)
completedApps.take(toRemove).foreach { a =>
applicationMetricsSystem.removeSource(a.appSource)
}
@ -989,14 +991,14 @@ private[deploy] class Master(
private def timeOutDeadWorkers() {
// Copy the workers into an array so we don't modify the hashset while iterating through it
val currentTime = System.currentTimeMillis()
val toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT_MS).toArray
val toRemove = workers.filter(_.lastHeartbeat < currentTime - workerTimeoutMs).toArray
for (worker <- toRemove) {
if (worker.state != WorkerState.DEAD) {
logWarning("Removing %s because we got no heartbeat in %d seconds".format(
worker.id, WORKER_TIMEOUT_MS / 1000))
removeWorker(worker, s"Not receiving heartbeat for ${WORKER_TIMEOUT_MS / 1000} seconds")
worker.id, workerTimeoutMs / 1000))
removeWorker(worker, s"Not receiving heartbeat for ${workerTimeoutMs / 1000} seconds")
} else {
if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT_MS)) {
if (worker.lastHeartbeat < currentTime - ((reaperIterations + 1) * workerTimeoutMs)) {
workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it
}
}
@ -1031,8 +1033,8 @@ private[deploy] class Master(
case Some(driver) =>
logInfo(s"Removing driver: $driverId")
drivers -= driver
if (completedDrivers.size >= RETAINED_DRIVERS) {
val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
if (completedDrivers.size >= retainedDrivers) {
val toRemove = math.max(retainedDrivers / 10, 1)
completedDrivers.trimStart(toRemove)
}
completedDrivers += driver

View file

@ -20,6 +20,7 @@ package org.apache.spark.deploy.master
import org.apache.spark.SparkConf
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Deploy.RECOVERY_DIRECTORY
import org.apache.spark.serializer.Serializer
/**
@ -52,11 +53,11 @@ abstract class StandaloneRecoveryModeFactory(conf: SparkConf, serializer: Serial
private[master] class FileSystemRecoveryModeFactory(conf: SparkConf, serializer: Serializer)
extends StandaloneRecoveryModeFactory(conf, serializer) with Logging {
val RECOVERY_DIR = conf.get("spark.deploy.recoveryDirectory", "")
val recoveryDir = conf.get(RECOVERY_DIRECTORY)
def createPersistenceEngine(): PersistenceEngine = {
logInfo("Persisting recovery state to directory: " + RECOVERY_DIR)
new FileSystemPersistenceEngine(RECOVERY_DIR, serializer)
logInfo("Persisting recovery state to directory: " + recoveryDir)
new FileSystemPersistenceEngine(recoveryDir, serializer)
}
def createLeaderElectionAgent(master: LeaderElectable): LeaderElectionAgent = {

View file

@ -23,11 +23,12 @@ import org.apache.curator.framework.recipes.leader.{LeaderLatch, LeaderLatchList
import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkCuratorUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Deploy.ZOOKEEPER_DIRECTORY
private[master] class ZooKeeperLeaderElectionAgent(val masterInstance: LeaderElectable,
conf: SparkConf) extends LeaderLatchListener with LeaderElectionAgent with Logging {
val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"
val workingDir = conf.get(ZOOKEEPER_DIRECTORY).getOrElse("/spark") + "/leader_election"
private var zk: CuratorFramework = _
private var leaderLatch: LeaderLatch = _
@ -38,7 +39,7 @@ private[master] class ZooKeeperLeaderElectionAgent(val masterInstance: LeaderEle
private def start() {
logInfo("Starting ZooKeeper LeaderElection agent")
zk = SparkCuratorUtil.newClient(conf)
leaderLatch = new LeaderLatch(zk, WORKING_DIR)
leaderLatch = new LeaderLatch(zk, workingDir)
leaderLatch.addListener(this)
leaderLatch.start()
}

View file

@ -28,6 +28,7 @@ import org.apache.zookeeper.CreateMode
import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkCuratorUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Deploy._
import org.apache.spark.serializer.Serializer
@ -35,22 +36,22 @@ private[master] class ZooKeeperPersistenceEngine(conf: SparkConf, val serializer
extends PersistenceEngine
with Logging {
private val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/master_status"
private val workingDir = conf.get(ZOOKEEPER_DIRECTORY).getOrElse("/spark") + "/master_status"
private val zk: CuratorFramework = SparkCuratorUtil.newClient(conf)
SparkCuratorUtil.mkdir(zk, WORKING_DIR)
SparkCuratorUtil.mkdir(zk, workingDir)
override def persist(name: String, obj: Object): Unit = {
serializeIntoFile(WORKING_DIR + "/" + name, obj)
serializeIntoFile(workingDir + "/" + name, obj)
}
override def unpersist(name: String): Unit = {
zk.delete().forPath(WORKING_DIR + "/" + name)
zk.delete().forPath(workingDir + "/" + name)
}
override def read[T: ClassTag](prefix: String): Seq[T] = {
zk.getChildren.forPath(WORKING_DIR).asScala
zk.getChildren.forPath(workingDir).asScala
.filter(_.startsWith(prefix)).flatMap(deserializeFromFile[T])
}
@ -66,13 +67,13 @@ private[master] class ZooKeeperPersistenceEngine(conf: SparkConf, val serializer
}
private def deserializeFromFile[T](filename: String)(implicit m: ClassTag[T]): Option[T] = {
val fileData = zk.getData().forPath(WORKING_DIR + "/" + filename)
val fileData = zk.getData().forPath(workingDir + "/" + filename)
try {
Some(serializer.newInstance().deserialize[T](ByteBuffer.wrap(fileData)))
} catch {
case e: Exception =>
logWarning("Exception while reading persisted file, deleting", e)
zk.delete().forPath(WORKING_DIR + "/" + filename)
zk.delete().forPath(workingDir + "/" + filename)
None
}
}

View file

@ -31,6 +31,7 @@ import org.apache.spark.deploy.DeployMessages.DriverStateChanged
import org.apache.spark.deploy.master.DriverState
import org.apache.spark.deploy.master.DriverState.DriverState
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Worker.WORKER_DRIVER_TERMINATE_TIMEOUT
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.util.{Clock, ShutdownHookManager, SystemClock, Utils}
@ -57,8 +58,7 @@ private[deploy] class DriverRunner(
@volatile private[worker] var finalException: Option[Exception] = None
// Timeout to wait for when trying to terminate a driver.
private val DRIVER_TERMINATE_TIMEOUT_MS =
conf.getTimeAsMs("spark.worker.driverTerminateTimeout", "10s")
private val driverTerminateTimeoutMs = conf.get(WORKER_DRIVER_TERMINATE_TIMEOUT)
// Decoupled for testing
def setClock(_clock: Clock): Unit = {
@ -122,7 +122,7 @@ private[deploy] class DriverRunner(
killed = true
synchronized {
process.foreach { p =>
val exitCode = Utils.terminateProcess(p, DRIVER_TERMINATE_TIMEOUT_MS)
val exitCode = Utils.terminateProcess(p, driverTerminateTimeoutMs)
if (exitCode.isEmpty) {
logWarning("Failed to terminate driver process: " + p +
". This process will likely be orphaned.")

View file

@ -39,6 +39,7 @@ import org.apache.spark.deploy.worker.ui.WorkerWebUI
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.internal.config.UI._
import org.apache.spark.internal.config.Worker._
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.rpc._
import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, Utils}
@ -74,7 +75,7 @@ private[deploy] class Worker(
// For worker and executor IDs
private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
// Send a heartbeat every (heartbeat timeout) / 4 milliseconds
private val HEARTBEAT_MILLIS = conf.getLong("spark.worker.timeout", 60) * 1000 / 4
private val HEARTBEAT_MILLIS = conf.get(WORKER_TIMEOUT) * 1000 / 4
// Model retries to connect to the master, after Hadoop's model.
// The first six attempts to reconnect are in shorter intervals (between 5 and 15 seconds)
@ -93,13 +94,11 @@ private[deploy] class Worker(
private val PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS = (math.round(60
* REGISTRATION_RETRY_FUZZ_MULTIPLIER))
private val CLEANUP_ENABLED = conf.getBoolean("spark.worker.cleanup.enabled", false)
private val CLEANUP_ENABLED = conf.get(WORKER_CLEANUP_ENABLED)
// How often worker will clean up old app folders
private val CLEANUP_INTERVAL_MILLIS =
conf.getLong("spark.worker.cleanup.interval", 60 * 30) * 1000
private val CLEANUP_INTERVAL_MILLIS = conf.get(WORKER_CLEANUP_INTERVAL) * 1000
// TTL for app folders/data; after TTL expires it will be cleaned up
private val APP_DATA_RETENTION_SECONDS =
conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600)
private val APP_DATA_RETENTION_SECONDS = conf.get(APP_DATA_RETENTION)
// Whether or not cleanup the non-shuffle files on executor exits.
private val CLEANUP_NON_SHUFFLE_FILES_ENABLED =
@ -111,8 +110,7 @@ private[deploy] class Worker(
* Whether to use the master address in `masterRpcAddresses` if possible. If it's disabled, Worker
* will just use the address received from Master.
*/
private val preferConfiguredMasterAddress =
conf.getBoolean("spark.worker.preferConfiguredMasterAddress", false)
private val preferConfiguredMasterAddress = conf.get(PREFER_CONFIGURED_MASTER_ADDRESS)
/**
* The master address to connect in case of failure. When the connection is broken, worker will
* use this address to connect. This is usually just one of `masterRpcAddresses`. However, when
@ -143,10 +141,8 @@ private[deploy] class Worker(
val appDirectories = new HashMap[String, Seq[String]]
val finishedApps = new HashSet[String]
val retainedExecutors = conf.getInt("spark.worker.ui.retainedExecutors",
WorkerWebUI.DEFAULT_RETAINED_EXECUTORS)
val retainedDrivers = conf.getInt("spark.worker.ui.retainedDrivers",
WorkerWebUI.DEFAULT_RETAINED_DRIVERS)
val retainedExecutors = conf.get(WORKER_UI_RETAINED_EXECUTORS)
val retainedDrivers = conf.get(WORKER_UI_RETAINED_DRIVERS)
// The shuffle service is not actually started unless configured.
private val shuffleService = if (externalShuffleServiceSupplier != null) {

View file

@ -22,6 +22,7 @@ import java.lang.management.ManagementFactory
import scala.annotation.tailrec
import org.apache.spark.SparkConf
import org.apache.spark.internal.config.Worker._
import org.apache.spark.util.{IntParam, MemoryParam, Utils}
/**
@ -59,9 +60,7 @@ private[worker] class WorkerArguments(args: Array[String], conf: SparkConf) {
// This mutates the SparkConf, so all accesses to it must be made after this line
propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)
if (conf.contains("spark.worker.ui.port")) {
webUiPort = conf.get("spark.worker.ui.port").toInt
}
conf.get(WORKER_UI_PORT).foreach { webUiPort = _ }
checkWorkerMemory()

View file

@ -56,6 +56,4 @@ class WorkerWebUI(
private[worker] object WorkerWebUI {
val STATIC_RESOURCE_BASE = SparkUI.STATIC_RESOURCE_DIR
val DEFAULT_RETAINED_DRIVERS = 1000
val DEFAULT_RETAINED_EXECUTORS = 1000
}

View file

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.internal.config
private[spark] object Deploy {
val RECOVERY_MODE = ConfigBuilder("spark.deploy.recoveryMode")
.stringConf
.createWithDefault("NONE")
val RECOVERY_MODE_FACTORY = ConfigBuilder("spark.deploy.recoveryMode.factory")
.stringConf
.createWithDefault("")
val RECOVERY_DIRECTORY = ConfigBuilder("spark.deploy.recoveryDirectory")
.stringConf
.createWithDefault("")
val ZOOKEEPER_URL = ConfigBuilder("spark.deploy.zookeeper.url")
.doc(s"When `${RECOVERY_MODE.key}` is set to ZOOKEEPER, this " +
"configuration is used to set the zookeeper URL to connect to.")
.stringConf
.createOptional
val ZOOKEEPER_DIRECTORY = ConfigBuilder("spark.deploy.zookeeper.dir")
.stringConf
.createOptional
val RETAINED_APPLICATIONS = ConfigBuilder("spark.deploy.retainedApplications")
.intConf
.createWithDefault(200)
val RETAINED_DRIVERS = ConfigBuilder("spark.deploy.retainedDrivers")
.intConf
.createWithDefault(200)
val REAPER_ITERATIONS = ConfigBuilder("spark.dead.worker.persistence")
.intConf
.createWithDefault(15)
val MAX_EXECUTOR_RETRIES = ConfigBuilder("spark.deploy.maxExecutorRetries")
.intConf
.createWithDefault(10)
val SPREAD_OUT_APPS = ConfigBuilder("spark.deploy.spreadOut")
.booleanConf
.createWithDefault(true)
val DEFAULT_CORES = ConfigBuilder("spark.deploy.defaultCores")
.intConf
.createWithDefault(Int.MaxValue)
}

View file

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.internal.config
import org.apache.spark.network.util.ByteUnit
private[spark] object Kryo {
val KRYO_REGISTRATION_REQUIRED = ConfigBuilder("spark.kryo.registrationRequired")
.booleanConf
.createWithDefault(false)
val KRYO_USER_REGISTRATORS = ConfigBuilder("spark.kryo.registrator")
.stringConf
.createOptional
val KRYO_CLASSES_TO_REGISTER = ConfigBuilder("spark.kryo.classesToRegister")
.stringConf
.toSequence
.createWithDefault(Nil)
val KRYO_USE_UNSAFE = ConfigBuilder("spark.kyro.unsafe")
.booleanConf
.createWithDefault(false)
val KRYO_USE_POOL = ConfigBuilder("spark.kyro.pool")
.booleanConf
.createWithDefault(true)
val KRYO_REFERENCE_TRACKING = ConfigBuilder("spark.kryo.referenceTracking")
.booleanConf
.createWithDefault(true)
val KRYO_SERIALIZER_BUFFER_SIZE = ConfigBuilder("spark.kryoserializer.buffer")
.bytesConf(ByteUnit.KiB)
.createWithDefaultString("64k")
val KRYO_SERIALIZER_MAX_BUFFER_SIZE = ConfigBuilder("spark.kryoserializer.buffer.max")
.bytesConf(ByteUnit.MiB)
.createWithDefaultString("64m")
}

View file

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.internal.config
import java.util.concurrent.TimeUnit
private[spark] object Worker {
val WORKER_TIMEOUT = ConfigBuilder("spark.worker.timeout")
.longConf
.createWithDefault(60)
val WORKER_DRIVER_TERMINATE_TIMEOUT = ConfigBuilder("spark.worker.driverTerminateTimeout")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("10s")
val WORKER_CLEANUP_ENABLED = ConfigBuilder("spark.worker.cleanup.enabled")
.booleanConf
.createWithDefault(false)
val WORKER_CLEANUP_INTERVAL = ConfigBuilder("spark.worker.cleanup.interval")
.longConf
.createWithDefault(60 * 30)
val APP_DATA_RETENTION = ConfigBuilder("spark.worker.cleanup.appDataTtl")
.longConf
.createWithDefault(7 * 24 * 3600)
val PREFER_CONFIGURED_MASTER_ADDRESS = ConfigBuilder("spark.worker.preferConfiguredMasterAddress")
.booleanConf
.createWithDefault(false)
val WORKER_UI_PORT = ConfigBuilder("spark.worker.ui.port")
.intConf
.createOptional
val WORKER_UI_RETAINED_EXECUTORS = ConfigBuilder("spark.worker.ui.retainedExecutors")
.intConf
.createWithDefault(1000)
val WORKER_UI_RETAINED_DRIVERS = ConfigBuilder("spark.worker.ui.retainedDrivers")
.intConf
.createWithDefault(1000)
val UNCOMPRESSED_LOG_FILE_LENGTH_CACHE_SIZE_CONF =
ConfigBuilder("spark.worker.ui.compressedLogFileLengthCacheSize")
.intConf
.createWithDefault(100)
}

View file

@ -961,4 +961,35 @@ package object config {
.intConf
.createWithDefault(4)
private[spark] val SERIALIZER = ConfigBuilder("spark.serializer")
.stringConf
.createWithDefault("org.apache.spark.serializer.JavaSerializer")
private[spark] val SERIALIZER_OBJECT_STREAM_RESET =
ConfigBuilder("spark.serializer.objectStreamReset")
.intConf
.createWithDefault(100)
private[spark] val SERIALIZER_EXTRA_DEBUG_INFO = ConfigBuilder("spark.serializer.extraDebugInfo")
.booleanConf
.createWithDefault(true)
private[spark] val JARS = ConfigBuilder("spark.jars")
.stringConf
.toSequence
.createWithDefault(Nil)
private[spark] val FILES = ConfigBuilder("spark.files")
.stringConf
.toSequence
.createWithDefault(Nil)
private[spark] val SUBMIT_DEPLOY_MODE = ConfigBuilder("spark.submit.deployMode")
.stringConf
.createWithDefault("client")
private[spark] val SUBMIT_PYTHON_FILES = ConfigBuilder("spark.submit.pyFiles")
.stringConf
.toSequence
.createWithDefault(Nil)
}

View file

@ -24,6 +24,7 @@ import scala.reflect.ClassTag
import org.apache.spark.SparkConf
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.config._
import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, Utils}
private[spark] class JavaSerializationStream(
@ -137,8 +138,8 @@ private[spark] class JavaSerializerInstance(
*/
@DeveloperApi
class JavaSerializer(conf: SparkConf) extends Serializer with Externalizable {
private var counterReset = conf.getInt("spark.serializer.objectStreamReset", 100)
private var extraDebugInfo = conf.getBoolean("spark.serializer.extraDebugInfo", true)
private var counterReset = conf.get(SERIALIZER_OBJECT_STREAM_RESET)
private var extraDebugInfo = conf.get(SERIALIZER_EXTRA_DEBUG_INFO)
protected def this() = this(new SparkConf()) // For deserialization only

View file

@ -39,6 +39,7 @@ import org.roaringbitmap.RoaringBitmap
import org.apache.spark._
import org.apache.spark.api.python.PythonBroadcast
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Kryo._
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus}
import org.apache.spark.storage._
@ -58,34 +59,34 @@ class KryoSerializer(conf: SparkConf)
with Logging
with Serializable {
private val bufferSizeKb = conf.getSizeAsKb("spark.kryoserializer.buffer", "64k")
private val bufferSizeKb = conf.get(KRYO_SERIALIZER_BUFFER_SIZE)
if (bufferSizeKb >= ByteUnit.GiB.toKiB(2)) {
throw new IllegalArgumentException("spark.kryoserializer.buffer must be less than " +
throw new IllegalArgumentException(s"${KRYO_SERIALIZER_BUFFER_SIZE.key} must be less than " +
s"2048 MiB, got: + ${ByteUnit.KiB.toMiB(bufferSizeKb)} MiB.")
}
private val bufferSize = ByteUnit.KiB.toBytes(bufferSizeKb).toInt
val maxBufferSizeMb = conf.getSizeAsMb("spark.kryoserializer.buffer.max", "64m").toInt
val maxBufferSizeMb = conf.get(KRYO_SERIALIZER_MAX_BUFFER_SIZE).toInt
if (maxBufferSizeMb >= ByteUnit.GiB.toMiB(2)) {
throw new IllegalArgumentException("spark.kryoserializer.buffer.max must be less than " +
s"2048 MiB, got: + $maxBufferSizeMb MiB.")
throw new IllegalArgumentException(s"${KRYO_SERIALIZER_MAX_BUFFER_SIZE.key} must be less " +
s"than 2048 MiB, got: $maxBufferSizeMb MiB.")
}
private val maxBufferSize = ByteUnit.MiB.toBytes(maxBufferSizeMb).toInt
private val referenceTracking = conf.getBoolean("spark.kryo.referenceTracking", true)
private val registrationRequired = conf.getBoolean("spark.kryo.registrationRequired", false)
private val userRegistrators = conf.get("spark.kryo.registrator", "")
.split(',').map(_.trim)
private val referenceTracking = conf.get(KRYO_REFERENCE_TRACKING)
private val registrationRequired = conf.get(KRYO_REGISTRATION_REQUIRED)
private val userRegistrators = conf.get(KRYO_USER_REGISTRATORS)
.map(_.trim)
.filter(!_.isEmpty)
private val classesToRegister = conf.get("spark.kryo.classesToRegister", "")
.split(',').map(_.trim)
private val classesToRegister = conf.get(KRYO_CLASSES_TO_REGISTER)
.map(_.trim)
.filter(!_.isEmpty)
private val avroSchemas = conf.getAvroSchema
// whether to use unsafe based IO for serialization
private val useUnsafe = conf.getBoolean("spark.kryo.unsafe", false)
private val usePool = conf.getBoolean("spark.kryo.pool", true)
private val useUnsafe = conf.get(KRYO_USE_UNSAFE)
private val usePool = conf.get(KRYO_USE_POOL)
def newKryoOutput(): KryoOutput =
if (useUnsafe) {
@ -407,7 +408,7 @@ private[spark] class KryoSerializerInstance(
} catch {
case e: KryoException if e.getMessage.startsWith("Buffer overflow") =>
throw new SparkException(s"Kryo serialization failed: ${e.getMessage}. To avoid this, " +
"increase spark.kryoserializer.buffer.max value.", e)
s"increase ${KRYO_SERIALIZER_MAX_BUFFER_SIZE.key} value.", e)
} finally {
releaseKryo(kryo)
}

View file

@ -62,6 +62,7 @@ import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.internal.config.UI._
import org.apache.spark.internal.config.Worker._
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
@ -1457,16 +1458,12 @@ private[spark] object Utils extends Logging {
CallSite(shortForm, longForm)
}
private val UNCOMPRESSED_LOG_FILE_LENGTH_CACHE_SIZE_CONF =
"spark.worker.ui.compressedLogFileLengthCacheSize"
private val DEFAULT_UNCOMPRESSED_LOG_FILE_LENGTH_CACHE_SIZE = 100
private var compressedLogFileLengthCache: LoadingCache[String, java.lang.Long] = null
private def getCompressedLogFileLengthCache(
sparkConf: SparkConf): LoadingCache[String, java.lang.Long] = this.synchronized {
if (compressedLogFileLengthCache == null) {
val compressedLogFileLengthCacheSize = sparkConf.getInt(
UNCOMPRESSED_LOG_FILE_LENGTH_CACHE_SIZE_CONF,
DEFAULT_UNCOMPRESSED_LOG_FILE_LENGTH_CACHE_SIZE)
val compressedLogFileLengthCacheSize = sparkConf.get(
UNCOMPRESSED_LOG_FILE_LENGTH_CACHE_SIZE_CONF)
compressedLogFileLengthCache = CacheBuilder.newBuilder()
.maximumSize(compressedLogFileLengthCacheSize)
.build[String, java.lang.Long](new CacheLoader[String, java.lang.Long]() {
@ -2535,8 +2532,7 @@ private[spark] object Utils extends Logging {
* has its own mechanism to distribute jars.
*/
def getUserJars(conf: SparkConf): Seq[String] = {
val sparkJars = conf.getOption("spark.jars")
sparkJars.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten
conf.get(JARS).filter(_.nonEmpty)
}
/**

View file

@ -27,6 +27,7 @@ import scala.concurrent.duration._
import org.scalatest.BeforeAndAfter
import org.scalatest.Matchers
import org.apache.spark.internal.config.Deploy._
import org.apache.spark.scheduler.{SparkListener, SparkListenerStageCompleted, SparkListenerTaskEnd, SparkListenerTaskStart}
import org.apache.spark.util.ThreadUtils
@ -256,7 +257,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
.set("spark.task.reaper.enabled", "true")
.set("spark.task.reaper.killTimeout", "-1")
.set("spark.task.reaper.PollingInterval", "1s")
.set("spark.deploy.maxExecutorRetries", "1")
.set(MAX_EXECUTOR_RETRIES, 1)
sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
// Add a listener to release the semaphore once any tasks are launched.

View file

@ -23,6 +23,7 @@ import java.util.concurrent.{Callable, CyclicBarrier, Executors, ExecutorService
import org.scalatest.Matchers
import org.apache.spark.ShuffleSuite.NonJavaSerializableClass
import org.apache.spark.internal.config.SERIALIZER
import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY
import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.rdd.{CoGroupedRDD, OrderedRDDFunctions, RDD, ShuffledRDD, SubtractedRDD}
@ -215,7 +216,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
test("sort with Java non serializable class - Kryo") {
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
val myConf = conf.clone().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val myConf = conf.clone().set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
sc = new SparkContext("local-cluster[2,1,1024]", "test", myConf)
val a = sc.parallelize(1 to 10, 2)
val b = a.map { x =>

View file

@ -28,6 +28,7 @@ import com.esotericsoftware.kryo.Kryo
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.History._
import org.apache.spark.internal.config.Kryo._
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.serializer.{JavaSerializer, KryoRegistrator, KryoSerializer}
import org.apache.spark.util.{ResetSystemProperties, RpcUtils}
@ -78,7 +79,7 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
assert(conf.get("spark.master") === "local[3]")
assert(conf.get("spark.app.name") === "My app")
assert(conf.get("spark.home") === "/path")
assert(conf.get("spark.jars") === "a.jar,b.jar")
assert(conf.get(JARS) === Seq("a.jar", "b.jar"))
assert(conf.get("spark.executorEnv.VAR1") === "value1")
assert(conf.get("spark.executorEnv.VAR2") === "value2")
assert(conf.get("spark.executorEnv.VAR3") === "value3")
@ -86,7 +87,7 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
// Test the Java-friendly versions of these too
conf.setJars(Array("c.jar", "d.jar"))
conf.setExecutorEnv(Array(("VAR4", "value4"), ("VAR5", "value5")))
assert(conf.get("spark.jars") === "c.jar,d.jar")
assert(conf.get(JARS) === Seq("c.jar", "d.jar"))
assert(conf.get("spark.executorEnv.VAR4") === "value4")
assert(conf.get("spark.executorEnv.VAR5") === "value5")
}
@ -182,19 +183,19 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
}
test("register kryo classes through registerKryoClasses") {
val conf = new SparkConf().set("spark.kryo.registrationRequired", "true")
val conf = new SparkConf().set(KRYO_REGISTRATION_REQUIRED, true)
conf.registerKryoClasses(Array(classOf[Class1], classOf[Class2]))
assert(conf.get("spark.kryo.classesToRegister") ===
classOf[Class1].getName + "," + classOf[Class2].getName)
assert(conf.get(KRYO_CLASSES_TO_REGISTER).toSet ===
Seq(classOf[Class1].getName, classOf[Class2].getName).toSet)
conf.registerKryoClasses(Array(classOf[Class3]))
assert(conf.get("spark.kryo.classesToRegister") ===
classOf[Class1].getName + "," + classOf[Class2].getName + "," + classOf[Class3].getName)
assert(conf.get(KRYO_CLASSES_TO_REGISTER).toSet ===
Seq(classOf[Class1].getName, classOf[Class2].getName, classOf[Class3].getName).toSet)
conf.registerKryoClasses(Array(classOf[Class2]))
assert(conf.get("spark.kryo.classesToRegister") ===
classOf[Class1].getName + "," + classOf[Class2].getName + "," + classOf[Class3].getName)
assert(conf.get(KRYO_CLASSES_TO_REGISTER).toSet ===
Seq(classOf[Class1].getName, classOf[Class2].getName, classOf[Class3].getName).toSet)
// Kryo doesn't expose a way to discover registered classes, but at least make sure this doesn't
// blow up.
@ -205,12 +206,12 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
}
test("register kryo classes through registerKryoClasses and custom registrator") {
val conf = new SparkConf().set("spark.kryo.registrationRequired", "true")
val conf = new SparkConf().set(KRYO_REGISTRATION_REQUIRED, true)
conf.registerKryoClasses(Array(classOf[Class1]))
assert(conf.get("spark.kryo.classesToRegister") === classOf[Class1].getName)
assert(conf.get(KRYO_CLASSES_TO_REGISTER).toSet === Seq(classOf[Class1].getName).toSet)
conf.set("spark.kryo.registrator", classOf[CustomRegistrator].getName)
conf.set(KRYO_USER_REGISTRATORS, classOf[CustomRegistrator].getName)
// Kryo doesn't expose a way to discover registered classes, but at least make sure this doesn't
// blow up.
@ -220,9 +221,9 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
}
test("register kryo classes through conf") {
val conf = new SparkConf().set("spark.kryo.registrationRequired", "true")
conf.set("spark.kryo.classesToRegister", "java.lang.StringBuffer")
conf.set("spark.serializer", classOf[KryoSerializer].getName)
val conf = new SparkConf().set(KRYO_REGISTRATION_REQUIRED, true)
conf.set(KRYO_CLASSES_TO_REGISTER, Seq("java.lang.StringBuffer"))
conf.set(SERIALIZER, classOf[KryoSerializer].getName)
// Kryo doesn't expose a way to discover registered classes, but at least make sure this doesn't
// blow up.

View file

@ -24,6 +24,7 @@ import scala.io.Source
import org.scalatest.Matchers
import org.apache.spark.{SharedSparkContext, SparkConf, SparkFunSuite}
import org.apache.spark.internal.config.Kryo._
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.util.Utils
@ -48,7 +49,7 @@ class PythonBroadcastSuite extends SparkFunSuite with Matchers with SharedSparkC
}
val broadcast = new PythonBroadcast(broadcastDataFile.getAbsolutePath)
assertBroadcastIsValid(broadcast)
val conf = new SparkConf().set("spark.kryo.registrationRequired", "true")
val conf = new SparkConf().set(KRYO_REGISTRATION_REQUIRED, true)
val deserializedBroadcast =
Utils.clone[PythonBroadcast](broadcast, new KryoSerializer(conf).newInstance())
assertBroadcastIsValid(deserializedBroadcast)

View file

@ -25,6 +25,7 @@ import org.scalatest.Assertions
import org.apache.spark._
import org.apache.spark.internal.config
import org.apache.spark.internal.config.SERIALIZER
import org.apache.spark.io.SnappyCompressionCodec
import org.apache.spark.rdd.RDD
import org.apache.spark.security.EncryptionFunSuite
@ -68,7 +69,7 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext with Encryptio
encryptionTest("Accessing TorrentBroadcast variables in a local cluster") { conf =>
val numSlaves = 4
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
conf.set(config.BROADCAST_COMPRESS, true)
sc = new SparkContext("local-cluster[%d, 1, 1024]".format(numSlaves), "test", conf)
val list = List[Int](1, 2, 3, 4)

View file

@ -221,7 +221,7 @@ class SparkSubmitSuite
val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs)
appArgs.deployMode should be ("client")
conf.get("spark.submit.deployMode") should be ("client")
conf.get(SUBMIT_DEPLOY_MODE) should be ("client")
// Both cmd line and configuration are specified, cmdline option takes the priority
val clArgs1 = Seq(
@ -235,7 +235,7 @@ class SparkSubmitSuite
val (_, _, conf1, _) = submit.prepareSubmitEnvironment(appArgs1)
appArgs1.deployMode should be ("cluster")
conf1.get("spark.submit.deployMode") should be ("cluster")
conf1.get(SUBMIT_DEPLOY_MODE) should be ("cluster")
// Neither cmdline nor configuration are specified, client mode is the default choice
val clArgs2 = Seq(
@ -248,7 +248,7 @@ class SparkSubmitSuite
val (_, _, conf2, _) = submit.prepareSubmitEnvironment(appArgs2)
appArgs2.deployMode should be ("client")
conf2.get("spark.submit.deployMode") should be ("client")
conf2.get(SUBMIT_DEPLOY_MODE) should be ("client")
}
test("handles YARN cluster mode") {
@ -374,12 +374,12 @@ class SparkSubmitSuite
val confMap = conf.getAll.toMap
confMap.keys should contain ("spark.master")
confMap.keys should contain ("spark.app.name")
confMap.keys should contain ("spark.jars")
confMap.keys should contain (JARS.key)
confMap.keys should contain ("spark.driver.memory")
confMap.keys should contain ("spark.driver.cores")
confMap.keys should contain ("spark.driver.supervise")
confMap.keys should contain (UI_ENABLED.key)
confMap.keys should contain ("spark.submit.deployMode")
confMap.keys should contain (SUBMIT_DEPLOY_MODE.key)
conf.get(UI_ENABLED) should be (false)
}
@ -467,7 +467,7 @@ class SparkSubmitSuite
val (_, _, conf, mainClass) = submit.prepareSubmitEnvironment(appArgs)
conf.get("spark.executor.memory") should be ("5g")
conf.get("spark.master") should be ("yarn")
conf.get("spark.submit.deployMode") should be ("cluster")
conf.get(SUBMIT_DEPLOY_MODE) should be ("cluster")
mainClass should be (SparkSubmit.YARN_CLUSTER_SUBMIT_CLASS)
}
@ -662,7 +662,7 @@ class SparkSubmitSuite
val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs)
appArgs.jars should be(Utils.resolveURIs(jars))
appArgs.files should be(Utils.resolveURIs(files))
conf.get("spark.jars") should be(Utils.resolveURIs(jars + ",thejar.jar"))
conf.get(JARS) should be(Utils.resolveURIs(jars + ",thejar.jar").split(",").toSeq)
conf.get("spark.files") should be(Utils.resolveURIs(files))
// Test files and archives (Yarn)
@ -692,8 +692,8 @@ class SparkSubmitSuite
val appArgs3 = new SparkSubmitArguments(clArgs3)
val (_, _, conf3, _) = submit.prepareSubmitEnvironment(appArgs3)
appArgs3.pyFiles should be(Utils.resolveURIs(pyFiles))
conf3.get("spark.submit.pyFiles") should be(
PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(","))
conf3.get(SUBMIT_PYTHON_FILES) should be(
PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)))
conf3.get(PYSPARK_DRIVER_PYTHON.key) should be("python3.4")
conf3.get(PYSPARK_PYTHON.key) should be("python3.5")
}
@ -744,8 +744,8 @@ class SparkSubmitSuite
)
val appArgs = new SparkSubmitArguments(clArgs)
val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs)
conf.get("spark.jars") should be(Utils.resolveURIs(jars + ",thejar.jar"))
conf.get("spark.files") should be(Utils.resolveURIs(files))
conf.get(JARS) should be(Utils.resolveURIs(jars + ",thejar.jar").split(",").toSeq)
conf.get(FILES) should be(Utils.resolveURIs(files).split(",").toSeq)
// Test files and archives (Yarn)
val f2 = File.createTempFile("test-submit-files-archives", "", tmpDir)
@ -776,8 +776,8 @@ class SparkSubmitSuite
)
val appArgs3 = new SparkSubmitArguments(clArgs3)
val (_, _, conf3, _) = submit.prepareSubmitEnvironment(appArgs3)
conf3.get("spark.submit.pyFiles") should be(
PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(","))
conf3.get(SUBMIT_PYTHON_FILES) should be(
PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)))
// Test remote python files
val hadoopConf = new Configuration()
@ -798,7 +798,7 @@ class SparkSubmitSuite
val appArgs4 = new SparkSubmitArguments(clArgs4)
val (_, _, conf4, _) = submit.prepareSubmitEnvironment(appArgs4, conf = Some(hadoopConf))
// Should not format python path for yarn cluster mode
conf4.get("spark.submit.pyFiles") should be(Utils.resolveURIs(remotePyFiles))
conf4.get(SUBMIT_PYTHON_FILES) should be(Utils.resolveURIs(remotePyFiles).split(","))
}
}
@ -1024,7 +1024,7 @@ class SparkSubmitSuite
conf.get("spark.repl.local.jars") should (startWith("file:"))
// local py files should not be a URI format.
conf.get("spark.submit.pyFiles") should (startWith("/"))
conf.get(SUBMIT_PYTHON_FILES).foreach { _ should (startWith("/")) }
}
}
@ -1155,7 +1155,7 @@ class SparkSubmitSuite
val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs, conf = Some(hadoopConf))
conf.get(PY_FILES.key) should be(s"s3a://${pyFile.getAbsolutePath}")
conf.get("spark.submit.pyFiles") should (startWith("/"))
conf.get(SUBMIT_PYTHON_FILES).foreach { _ should (startWith("/")) }
// Verify "spark.submit.pyFiles"
val args1 = Seq(
@ -1171,7 +1171,7 @@ class SparkSubmitSuite
val (_, _, conf1, _) = submit.prepareSubmitEnvironment(appArgs1, conf = Some(hadoopConf))
conf1.get(PY_FILES.key) should be(s"s3a://${pyFile.getAbsolutePath}")
conf1.get("spark.submit.pyFiles") should (startWith("/"))
conf.get(SUBMIT_PYTHON_FILES).foreach { _ should (startWith("/")) }
}
}

View file

@ -40,7 +40,9 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.deploy._
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Deploy._
import org.apache.spark.internal.config.UI._
import org.apache.spark.internal.config.Worker._
import org.apache.spark.rpc.{RpcAddress, RpcEndpoint, RpcEndpointRef, RpcEnv}
import org.apache.spark.serializer
@ -103,9 +105,8 @@ class MasterSuite extends SparkFunSuite
test("can use a custom recovery mode factory") {
val conf = new SparkConf(loadDefaults = false)
conf.set("spark.deploy.recoveryMode", "CUSTOM")
conf.set("spark.deploy.recoveryMode.factory",
classOf[CustomRecoveryModeFactory].getCanonicalName)
conf.set(RECOVERY_MODE, "CUSTOM")
conf.set(RECOVERY_MODE_FACTORY, classOf[CustomRecoveryModeFactory].getCanonicalName)
conf.set(MASTER_REST_SERVER_ENABLED, false)
val instantiationAttempts = CustomRecoveryModeFactory.instantiationAttempts
@ -188,9 +189,8 @@ class MasterSuite extends SparkFunSuite
test("master correctly recover the application") {
val conf = new SparkConf(loadDefaults = false)
conf.set("spark.deploy.recoveryMode", "CUSTOM")
conf.set("spark.deploy.recoveryMode.factory",
classOf[FakeRecoveryModeFactory].getCanonicalName)
conf.set(RECOVERY_MODE, "CUSTOM")
conf.set(RECOVERY_MODE_FACTORY, classOf[FakeRecoveryModeFactory].getCanonicalName)
conf.set(MASTER_REST_SERVER_ENABLED, false)
val fakeAppInfo = makeAppInfo(1024)
@ -637,7 +637,7 @@ class MasterSuite extends SparkFunSuite
}
test("SPARK-19900: there should be a corresponding driver for the app after relaunching driver") {
val conf = new SparkConf().set("spark.worker.timeout", "1")
val conf = new SparkConf().set(WORKER_TIMEOUT, 1L)
val master = makeMaster(conf)
master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
eventually(timeout(10.seconds)) {

View file

@ -24,6 +24,7 @@ import org.apache.commons.lang3.RandomUtils
import org.apache.curator.test.TestingServer
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.internal.config.Deploy.ZOOKEEPER_URL
import org.apache.spark.rpc.{RpcEndpoint, RpcEnv}
import org.apache.spark.serializer.{JavaSerializer, Serializer}
import org.apache.spark.util.Utils
@ -48,7 +49,7 @@ class PersistenceEngineSuite extends SparkFunSuite {
val zkTestServer = new TestingServer(findFreePort(conf))
try {
testPersistenceEngine(conf, serializer => {
conf.set("spark.deploy.zookeeper.url", zkTestServer.getConnectString)
conf.set(ZOOKEEPER_URL, zkTestServer.getConnectString)
new ZooKeeperPersistenceEngine(conf, serializer)
})
} finally {

View file

@ -22,6 +22,7 @@ import java.lang.Boolean
import org.json4s.jackson.JsonMethods._
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.internal.config._
import org.apache.spark.util.Utils
/**
@ -93,7 +94,7 @@ class SubmitRestProtocolSuite extends SparkFunSuite {
message.sparkProperties = conf.getAll.toMap
message.validate()
// optional fields
conf.set("spark.jars", "mayonnaise.jar,ketchup.jar")
conf.set(JARS, Seq("mayonnaise.jar", "ketchup.jar"))
conf.set("spark.files", "fireball.png")
conf.set("spark.driver.memory", s"${Utils.DEFAULT_DRIVER_MEM_MB}m")
conf.set("spark.driver.cores", "180")

View file

@ -32,6 +32,7 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.deploy.{Command, ExecutorState, ExternalShuffleService}
import org.apache.spark.deploy.DeployMessages.{DriverStateChanged, ExecutorStateChanged}
import org.apache.spark.deploy.master.DriverState
import org.apache.spark.internal.config.Worker._
import org.apache.spark.rpc.{RpcAddress, RpcEnv}
class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
@ -100,7 +101,7 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
test("test clearing of finishedExecutors (small number of executors)") {
val conf = new SparkConf()
conf.set("spark.worker.ui.retainedExecutors", 2.toString)
conf.set(WORKER_UI_RETAINED_EXECUTORS, 2)
val worker = makeWorker(conf)
// initialize workers
for (i <- 0 until 5) {
@ -124,7 +125,7 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
test("test clearing of finishedExecutors (more executors)") {
val conf = new SparkConf()
conf.set("spark.worker.ui.retainedExecutors", 30.toString)
conf.set(WORKER_UI_RETAINED_EXECUTORS, 30)
val worker = makeWorker(conf)
// initialize workers
for (i <- 0 until 50) {
@ -157,7 +158,7 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
test("test clearing of finishedDrivers (small number of drivers)") {
val conf = new SparkConf()
conf.set("spark.worker.ui.retainedDrivers", 2.toString)
conf.set(WORKER_UI_RETAINED_DRIVERS, 2)
val worker = makeWorker(conf)
// initialize workers
for (i <- 0 until 5) {
@ -181,7 +182,7 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
test("test clearing of finishedDrivers (more drivers)") {
val conf = new SparkConf()
conf.set("spark.worker.ui.retainedDrivers", 30.toString)
conf.set(WORKER_UI_RETAINED_DRIVERS, 30)
val worker = makeWorker(conf)
// initialize workers
for (i <- 0 until 50) {

View file

@ -181,7 +181,7 @@ class MapStatusSuite extends SparkFunSuite {
test("SPARK-21133 HighlyCompressedMapStatus#writeExternal throws NPE") {
val conf = new SparkConf()
.set("spark.serializer", classOf[KryoSerializer].getName)
.set(config.SERIALIZER, classOf[KryoSerializer].getName)
.setMaster("local")
.setAppName("SPARK-21133")
withSpark(new SparkContext(conf)) { sc =>

View file

@ -25,9 +25,10 @@ import org.apache.avro.{Schema, SchemaBuilder}
import org.apache.avro.generic.GenericData.Record
import org.apache.spark.{SharedSparkContext, SparkFunSuite}
import org.apache.spark.internal.config.SERIALIZER
class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext {
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
val schema : Schema = SchemaBuilder
.record("testRecord").fields()

View file

@ -22,6 +22,8 @@ import scala.util.Random
import org.apache.spark.SparkConf
import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Kryo._
import org.apache.spark.serializer.KryoTest._
/**
@ -122,9 +124,9 @@ object KryoBenchmark extends BenchmarkBase {
def createSerializer(useUnsafe: Boolean): SerializerInstance = {
val conf = new SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrator", classOf[MyRegistrator].getName)
conf.set("spark.kryo.unsafe", useUnsafe.toString)
conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
conf.set(KRYO_USER_REGISTRATORS, classOf[MyRegistrator].getName)
conf.set(KRYO_USE_UNSAFE, useUnsafe)
new KryoSerializer(conf).newInstance()
}

View file

@ -23,6 +23,8 @@ import scala.concurrent.duration._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Kryo._
import org.apache.spark.serializer.KryoTest._
import org.apache.spark.util.ThreadUtils
@ -69,9 +71,9 @@ object KryoSerializerBenchmark extends BenchmarkBase {
def createSparkContext(usePool: Boolean): SparkContext = {
val conf = new SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrator", classOf[MyRegistrator].getName)
conf.set("spark.kryo.pool", usePool.toString)
conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
conf.set(KRYO_USER_REGISTRATORS, classOf[MyRegistrator].getName)
conf.set(KRYO_USE_POOL, usePool)
if (sc != null) {
sc.stop()

View file

@ -28,8 +28,8 @@ class KryoSerializerDistributedSuite extends SparkFunSuite with LocalSparkContex
test("kryo objects are serialised consistently in different processes") {
val conf = new SparkConf(false)
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", classOf[AppJarRegistrator].getName)
.set(config.SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
.set(config.Kryo.KRYO_USER_REGISTRATORS, classOf[AppJarRegistrator].getName)
.set(config.MAX_TASK_FAILURES, 1)
.set(config.BLACKLIST_ENABLED, false)

View file

@ -21,6 +21,8 @@ import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.LocalSparkContext._
import org.apache.spark.SparkContext
import org.apache.spark.SparkException
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Kryo._
class KryoSerializerResizableOutputSuite extends SparkFunSuite {
@ -29,9 +31,9 @@ class KryoSerializerResizableOutputSuite extends SparkFunSuite {
test("kryo without resizable output buffer should fail on large array") {
val conf = new SparkConf(false)
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryoserializer.buffer", "1m")
conf.set("spark.kryoserializer.buffer.max", "1m")
conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
conf.set(KRYO_SERIALIZER_BUFFER_SIZE.key, "1m")
conf.set(KRYO_SERIALIZER_MAX_BUFFER_SIZE.key, "1m")
withSpark(new SparkContext("local", "test", conf)) { sc =>
intercept[SparkException](sc.parallelize(x).collect())
}
@ -39,9 +41,9 @@ class KryoSerializerResizableOutputSuite extends SparkFunSuite {
test("kryo with resizable output buffer should succeed on large array") {
val conf = new SparkConf(false)
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryoserializer.buffer", "1m")
conf.set("spark.kryoserializer.buffer.max", "2m")
conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
conf.set(KRYO_SERIALIZER_BUFFER_SIZE.key, "1m")
conf.set(KRYO_SERIALIZER_MAX_BUFFER_SIZE.key, "2m")
withSpark(new SparkContext("local", "test", conf)) { sc =>
assert(sc.parallelize(x).collect() === x)
}

View file

@ -32,19 +32,21 @@ import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
import org.roaringbitmap.RoaringBitmap
import org.apache.spark.{SharedSparkContext, SparkConf, SparkFunSuite}
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Kryo._
import org.apache.spark.scheduler.HighlyCompressedMapStatus
import org.apache.spark.serializer.KryoTest._
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.{ThreadUtils, Utils}
class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrator", classOf[MyRegistrator].getName)
conf.set("spark.kryo.unsafe", "false")
conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
conf.set(KRYO_USER_REGISTRATORS, classOf[MyRegistrator].getName)
conf.set(KRYO_USE_UNSAFE, false)
test("SPARK-7392 configuration limits") {
val kryoBufferProperty = "spark.kryoserializer.buffer"
val kryoBufferMaxProperty = "spark.kryoserializer.buffer.max"
val kryoBufferProperty = KRYO_SERIALIZER_BUFFER_SIZE.key
val kryoBufferMaxProperty = KRYO_SERIALIZER_MAX_BUFFER_SIZE.key
def newKryoInstance(
conf: SparkConf,
@ -81,7 +83,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
test("basic types") {
val conf = new SparkConf(false)
conf.set("spark.kryo.registrationRequired", "true")
conf.set(KRYO_REGISTRATION_REQUIRED, true)
val ser = new KryoSerializer(conf).newInstance()
def check[T: ClassTag](t: T) {
@ -114,7 +116,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
test("pairs") {
val conf = new SparkConf(false)
conf.set("spark.kryo.registrationRequired", "true")
conf.set(KRYO_REGISTRATION_REQUIRED, true)
val ser = new KryoSerializer(conf).newInstance()
def check[T: ClassTag](t: T) {
@ -141,7 +143,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
test("Scala data structures") {
val conf = new SparkConf(false)
conf.set("spark.kryo.registrationRequired", "true")
conf.set(KRYO_REGISTRATION_REQUIRED, true)
val ser = new KryoSerializer(conf).newInstance()
def check[T: ClassTag](t: T) {
@ -169,7 +171,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
}
test("Bug: SPARK-10251") {
val ser = new KryoSerializer(conf.clone.set("spark.kryo.registrationRequired", "true"))
val ser = new KryoSerializer(conf.clone.set(KRYO_REGISTRATION_REQUIRED, true))
.newInstance()
def check[T: ClassTag](t: T) {
assert(ser.deserialize[T](ser.serialize(t)) === t)
@ -253,7 +255,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
hashMap.put("foo", "bar")
check(hashMap)
System.clearProperty("spark.kryo.registrator")
System.clearProperty(KRYO_USER_REGISTRATORS.key)
}
test("kryo with collect") {
@ -310,7 +312,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
import org.apache.spark.SparkException
val conf = new SparkConf(false)
conf.set("spark.kryo.registrator", "this.class.does.not.exist")
conf.set(KRYO_USER_REGISTRATORS, "this.class.does.not.exist")
val thrown = intercept[SparkException](new KryoSerializer(conf).newInstance().serialize(1))
assert(thrown.getMessage.contains("Failed to register classes with Kryo"))
@ -337,7 +339,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
test("registration of HighlyCompressedMapStatus") {
val conf = new SparkConf(false)
conf.set("spark.kryo.registrationRequired", "true")
conf.set(KRYO_REGISTRATION_REQUIRED, true)
// these cases require knowing the internals of RoaringBitmap a little. Blocks span 2^16
// values, and they use a bitmap (dense) if they have more than 4096 values, and an
@ -355,7 +357,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
test("serialization buffer overflow reporting") {
import org.apache.spark.SparkException
val kryoBufferMaxProperty = "spark.kryoserializer.buffer.max"
val kryoBufferMaxProperty = KRYO_SERIALIZER_MAX_BUFFER_SIZE.key
val largeObject = (1 to 1000000).toArray
@ -409,7 +411,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
test("getAutoReset") {
val ser = new KryoSerializer(new SparkConf).newInstance().asInstanceOf[KryoSerializerInstance]
assert(ser.getAutoReset)
val conf = new SparkConf().set("spark.kryo.registrator",
val conf = new SparkConf().set(KRYO_USER_REGISTRATORS,
classOf[RegistratorWithoutAutoReset].getName)
val ser2 = new KryoSerializer(conf).newInstance().asInstanceOf[KryoSerializerInstance]
assert(!ser2.getAutoReset)
@ -438,10 +440,10 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
private def testSerializerInstanceReuse(
autoReset: Boolean, referenceTracking: Boolean, usePool: Boolean): Unit = {
val conf = new SparkConf(loadDefaults = false)
.set("spark.kryo.referenceTracking", referenceTracking.toString)
.set("spark.kryo.pool", usePool.toString)
.set(KRYO_REFERENCE_TRACKING, referenceTracking)
.set(KRYO_USE_POOL, usePool)
if (!autoReset) {
conf.set("spark.kryo.registrator", classOf[RegistratorWithoutAutoReset].getName)
conf.set(KRYO_USER_REGISTRATORS, classOf[RegistratorWithoutAutoReset].getName)
}
val ser = new KryoSerializer(conf)
val serInstance = ser.newInstance().asInstanceOf[KryoSerializerInstance]
@ -478,7 +480,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
implicit val executionContext: ExecutionContext = ExecutionContext.fromExecutor(
Executors.newFixedThreadPool(4))
val ser = new KryoSerializer(conf.clone.set("spark.kryo.pool", "true"))
val ser = new KryoSerializer(conf.clone.set(KRYO_USE_POOL, true))
val tests = mutable.ListBuffer[Future[Boolean]]()
@ -519,9 +521,9 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
}
class KryoSerializerAutoResetDisabledSuite extends SparkFunSuite with SharedSparkContext {
conf.set("spark.serializer", classOf[KryoSerializer].getName)
conf.set("spark.kryo.registrator", classOf[RegistratorWithoutAutoReset].getName)
conf.set("spark.kryo.referenceTracking", "true")
conf.set(SERIALIZER, classOf[KryoSerializer].getName)
conf.set(KRYO_USER_REGISTRATORS, classOf[RegistratorWithoutAutoReset].getName)
conf.set(KRYO_REFERENCE_TRACKING, true)
conf.set("spark.shuffle.manager", "sort")
conf.set("spark.shuffle.sort.bypassMergeThreshold", "200")

View file

@ -24,6 +24,7 @@ import scala.util.Random
import org.scalatest.Assertions
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.internal.config.Kryo._
import org.apache.spark.serializer.KryoTest.RegistratorWithoutAutoReset
/**
@ -50,7 +51,7 @@ class SerializerPropertiesSuite extends SparkFunSuite {
}
test("KryoSerializer does not support relocation when auto-reset is disabled") {
val conf = new SparkConf().set("spark.kryo.registrator",
val conf = new SparkConf().set(KRYO_USER_REGISTRATORS,
classOf[RegistratorWithoutAutoReset].getName)
val ser = new KryoSerializer(conf)
assert(!ser.newInstance().asInstanceOf[KryoSerializerInstance].getAutoReset())

View file

@ -17,17 +17,19 @@
package org.apache.spark.serializer
import org.apache.spark.internal.config.Kryo._
class UnsafeKryoSerializerSuite extends KryoSerializerSuite {
// This test suite should run all tests in KryoSerializerSuite with kryo unsafe.
override def beforeAll() {
conf.set("spark.kryo.unsafe", "true")
conf.set(KRYO_USE_UNSAFE, true)
super.beforeAll()
}
override def afterAll() {
conf.set("spark.kryo.unsafe", "false")
conf.set(KRYO_USE_UNSAFE, false)
super.afterAll()
}
}

View file

@ -17,7 +17,7 @@
package org.apache.spark.storage
import org.apache.spark._
import org.apache.spark.internal.config._
class FlatmapIteratorSuite extends SparkFunSuite with LocalSparkContext {
/* Tests the ability of Spark to deal with user provided iterators from flatMap
@ -55,7 +55,7 @@ class FlatmapIteratorSuite extends SparkFunSuite with LocalSparkContext {
test("Serializer Reset") {
val sconf = new SparkConf().setMaster("local").setAppName("serializer_reset_test")
.set("spark.serializer.objectStreamReset", "10")
.set(SERIALIZER_OBJECT_STREAM_RESET, 10)
sc = new SparkContext(sconf)
val expand_size = 500
val data = sc.parallelize(Seq(1, 2)).

View file

@ -40,6 +40,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.{SparkConf, SparkException, SparkFunSuite, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.scheduler.SparkListener
@ -829,7 +830,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
test("isDynamicAllocationEnabled") {
val conf = new SparkConf()
conf.set("spark.master", "yarn")
conf.set("spark.submit.deployMode", "client")
conf.set(SUBMIT_DEPLOY_MODE, "client")
assert(Utils.isDynamicAllocationEnabled(conf) === false)
assert(Utils.isDynamicAllocationEnabled(
conf.set("spark.dynamicAllocation.enabled", "false")) === false)

View file

@ -54,8 +54,8 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite
val conf = new SparkConf(loadDefaults)
// Make the Java serializer write a reset instruction (TC_RESET) after each object to test
// for a bug we had with bytes written past the last object in a batch (SPARK-2792)
conf.set("spark.serializer.objectStreamReset", "1")
conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
conf.set(SERIALIZER_OBJECT_STREAM_RESET, 1)
conf.set(SERIALIZER, "org.apache.spark.serializer.JavaSerializer")
conf.set("spark.shuffle.spill.compress", codec.isDefined.toString)
conf.set("spark.shuffle.compress", codec.isDefined.toString)
codec.foreach { c => conf.set(IO_COMPRESSION_CODEC, c) }

View file

@ -23,6 +23,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.util.Random
import org.apache.spark._
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Tests.TEST_MEMORY
import org.apache.spark.memory.MemoryTestingUtils
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
@ -268,12 +269,12 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
private def createSparkConf(loadDefaults: Boolean, kryo: Boolean): SparkConf = {
val conf = new SparkConf(loadDefaults)
if (kryo) {
conf.set("spark.serializer", classOf[KryoSerializer].getName)
conf.set(SERIALIZER, classOf[KryoSerializer].getName)
} else {
// Make the Java serializer write a reset instruction (TC_RESET) after each object to test
// for a bug we had with bytes written past the last object in a batch (SPARK-2792)
conf.set("spark.serializer.objectStreamReset", "1")
conf.set("spark.serializer", classOf[JavaSerializer].getName)
conf.set(SERIALIZER_OBJECT_STREAM_RESET, 1)
conf.set(SERIALIZER, classOf[JavaSerializer].getName)
}
conf.set("spark.shuffle.sort.bypassMergeThreshold", "0")
// Ensure that we actually have multiple batches per spill file

View file

@ -18,6 +18,7 @@
package org.apache.spark.ml.attribute
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.internal.config.Kryo._
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.sql.types._
@ -225,7 +226,7 @@ class AttributeSuite extends SparkFunSuite {
test("Kryo class register") {
val conf = new SparkConf(false)
conf.set("spark.kryo.registrationRequired", "true")
conf.set(KRYO_REGISTRATION_REQUIRED, true)
val ser = new KryoSerializer(conf).newInstance()

View file

@ -18,13 +18,14 @@
package org.apache.spark.ml.feature
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.internal.config.Kryo._
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.serializer.KryoSerializer
class InstanceSuite extends SparkFunSuite{
test("Kryo class register") {
val conf = new SparkConf(false)
conf.set("spark.kryo.registrationRequired", "true")
conf.set(KRYO_REGISTRATION_REQUIRED, true)
val ser = new KryoSerializer(conf).newInstance()

View file

@ -18,13 +18,14 @@
package org.apache.spark.ml.feature
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.internal.config.Kryo._
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.serializer.KryoSerializer
class LabeledPointSuite extends SparkFunSuite {
test("Kryo class register") {
val conf = new SparkConf(false)
conf.set("spark.kryo.registrationRequired", "true")
conf.set(KRYO_REGISTRATION_REQUIRED, true)
val ser = new KryoSerializer(conf).newInstance()

View file

@ -18,12 +18,13 @@
package org.apache.spark.ml.tree.impl
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.internal.config.Kryo._
import org.apache.spark.serializer.KryoSerializer
class TreePointSuite extends SparkFunSuite {
test("Kryo class register") {
val conf = new SparkConf(false)
conf.set("spark.kryo.registrationRequired", "true")
conf.set(KRYO_REGISTRATION_REQUIRED, true)
val ser = new KryoSerializer(conf).newInstance()

View file

@ -20,6 +20,7 @@ package org.apache.spark.mllib.clustering
import scala.util.Random
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.internal.config.Kryo._
import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors}
import org.apache.spark.mllib.util.{LocalClusterSparkContext, MLlibTestSparkContext}
import org.apache.spark.mllib.util.TestingUtils._
@ -316,7 +317,7 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext {
test("Kryo class register") {
val conf = new SparkConf(false)
conf.set("spark.kryo.registrationRequired", "true")
conf.set(KRYO_REGISTRATION_REQUIRED, true)
val ser = new KryoSerializer(conf).newInstance()

View file

@ -18,8 +18,10 @@
package org.apache.spark.mllib.feature
import org.apache.spark.SparkFunSuite
import org.apache.spark.internal.config.Kryo._
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.sql.internal.SQLConf._
import org.apache.spark.util.Utils
class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext {
@ -109,12 +111,16 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext {
test("big model load / save") {
// backupping old values
val oldBufferConfValue = spark.conf.get("spark.kryoserializer.buffer.max", "64m")
val oldBufferMaxConfValue = spark.conf.get("spark.kryoserializer.buffer", "64k")
val oldBufferConfValue = spark.conf.get(KRYO_SERIALIZER_BUFFER_SIZE.key, "64m")
val oldBufferMaxConfValue = spark.conf.get(KRYO_SERIALIZER_MAX_BUFFER_SIZE.key, "64k")
val oldSetCommandRejectsSparkCoreConfs = spark.conf.get(
SET_COMMAND_REJECTS_SPARK_CORE_CONFS.key, "true")
// setting test values to trigger partitioning
spark.conf.set("spark.kryoserializer.buffer", "50b")
spark.conf.set("spark.kryoserializer.buffer.max", "50b")
// this is needed to set configurations which are also defined to SparkConf
spark.conf.set(SET_COMMAND_REJECTS_SPARK_CORE_CONFS.key, "false")
spark.conf.set(KRYO_SERIALIZER_BUFFER_SIZE.key, "50b")
// create a model bigger than 50 Bytes
val word2VecMap = Map((0 to 10).map(i => s"$i" -> Array.fill(10)(0.1f)): _*)
@ -137,8 +143,9 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext {
"that spans over multiple partitions", t)
} finally {
Utils.deleteRecursively(tempDir)
spark.conf.set("spark.kryoserializer.buffer", oldBufferConfValue)
spark.conf.set("spark.kryoserializer.buffer.max", oldBufferMaxConfValue)
spark.conf.set(KRYO_SERIALIZER_BUFFER_SIZE.key, oldBufferConfValue)
spark.conf.set(KRYO_SERIALIZER_MAX_BUFFER_SIZE.key, oldBufferMaxConfValue)
spark.conf.set(SET_COMMAND_REJECTS_SPARK_CORE_CONFS.key, oldSetCommandRejectsSparkCoreConfs)
}
}

View file

@ -27,6 +27,7 @@ import org.mockito.Mockito.when
import org.scalatest.mockito.MockitoSugar._
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.internal.config.Kryo._
import org.apache.spark.ml.{linalg => newlinalg}
import org.apache.spark.mllib.util.TestingUtils._
import org.apache.spark.serializer.KryoSerializer
@ -34,7 +35,7 @@ import org.apache.spark.serializer.KryoSerializer
class MatricesSuite extends SparkFunSuite {
test("kryo class register") {
val conf = new SparkConf(false)
conf.set("spark.kryo.registrationRequired", "true")
conf.set(KRYO_REGISTRATION_REQUIRED, true)
val ser = new KryoSerializer(conf).newInstance()

View file

@ -25,6 +25,7 @@ import org.json4s.jackson.JsonMethods.{parse => parseJson}
import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Kryo._
import org.apache.spark.ml.{linalg => newlinalg}
import org.apache.spark.mllib.util.TestingUtils._
import org.apache.spark.serializer.KryoSerializer
@ -38,7 +39,7 @@ class VectorsSuite extends SparkFunSuite with Logging {
test("kryo class register") {
val conf = new SparkConf(false)
conf.set("spark.kryo.registrationRequired", "true")
conf.set(KRYO_REGISTRATION_REQUIRED, true)
val ser = new KryoSerializer(conf).newInstance()
def check[T: ClassTag](t: T) {

View file

@ -18,6 +18,7 @@
package org.apache.spark.mllib.regression
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.internal.config.Kryo._
import org.apache.spark.ml.feature.{LabeledPoint => NewLabeledPoint}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.serializer.KryoSerializer
@ -57,7 +58,7 @@ class LabeledPointSuite extends SparkFunSuite {
test("Kryo class register") {
val conf = new SparkConf(false)
conf.set("spark.kryo.registrationRequired", "true")
conf.set(KRYO_REGISTRATION_REQUIRED, true)
val ser = new KryoSerializer(conf).newInstance()

View file

@ -18,6 +18,7 @@
package org.apache.spark.mllib.stat.distribution
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.internal.config.Kryo._
import org.apache.spark.mllib.linalg.{Matrices, Vectors}
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.mllib.util.TestingUtils._
@ -83,7 +84,7 @@ class MultivariateGaussianSuite extends SparkFunSuite with MLlibTestSparkContext
test("Kryo class register") {
val conf = new SparkConf(false)
conf.set("spark.kryo.registrationRequired", "true")
conf.set(KRYO_REGISTRATION_REQUIRED, true)
val ser = new KryoSerializer(conf).newInstance()

View file

@ -154,12 +154,11 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
KUBERNETES_DRIVER_SUBMIT_CHECK.key -> "true",
MEMORY_OVERHEAD_FACTOR.key -> overheadFactor.toString)
Seq("spark.jars", "spark.files").foreach { key =>
conf.getOption(key).foreach { value =>
val resolved = KubernetesUtils.resolveFileUrisAndPath(Utils.stringToSeq(value))
if (resolved.nonEmpty) {
additionalProps.put(key, resolved.mkString(","))
}
Seq(JARS, FILES).foreach { key =>
val value = conf.get(key)
val resolved = KubernetesUtils.resolveFileUrisAndPath(value)
if (resolved.nonEmpty) {
additionalProps.put(key.key, resolved.mkString(","))
}
}

View file

@ -109,21 +109,22 @@ private[spark] class DriverCommandFeatureStep(conf: KubernetesDriverConf)
}
private def additionalJavaProperties(resource: String): Map[String, String] = {
resourceType(APP_RESOURCE_TYPE_JAVA) ++ mergeFileList("spark.jars", Seq(resource))
resourceType(APP_RESOURCE_TYPE_JAVA) ++ mergeFileList(JARS, Seq(resource))
}
private def additionalPythonProperties(resource: String): Map[String, String] = {
resourceType(APP_RESOURCE_TYPE_PYTHON) ++
mergeFileList("spark.files", Seq(resource) ++ conf.pyFiles)
mergeFileList(FILES, Seq(resource) ++ conf.pyFiles)
}
private def additionalRProperties(resource: String): Map[String, String] = {
resourceType(APP_RESOURCE_TYPE_R) ++ mergeFileList("spark.files", Seq(resource))
resourceType(APP_RESOURCE_TYPE_R) ++ mergeFileList(FILES, Seq(resource))
}
private def mergeFileList(key: String, filesToAdd: Seq[String]): Map[String, String] = {
val existing = Utils.stringToSeq(conf.get(key, ""))
Map(key -> (existing ++ filesToAdd).distinct.mkString(","))
private def mergeFileList(key: ConfigEntry[Seq[String]], filesToAdd: Seq[String])
: Map[String, String] = {
val existing = conf.get(key)
Map(key.key -> (existing ++ filesToAdd).distinct.mkString(","))
}
private def resourceType(resType: String): Map[String, String] = {

View file

@ -143,7 +143,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
val sparkConf = new SparkConf()
.set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod")
.setJars(allJars)
.set("spark.files", allFiles.mkString(","))
.set(FILES, allFiles)
.set(CONTAINER_IMAGE, "spark-driver:latest")
val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf)
@ -154,8 +154,8 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
"spark.app.id" -> KubernetesTestConf.APP_ID,
KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> kubernetesConf.resourceNamePrefix,
"spark.kubernetes.submitInDriver" -> "true",
"spark.jars" -> "/opt/spark/jar1.jar,hdfs:///opt/spark/jar2.jar",
"spark.files" -> "https://localhost:9000/file1.txt,/opt/spark/file2.txt",
JARS.key -> "/opt/spark/jar1.jar,hdfs:///opt/spark/jar2.jar",
FILES.key -> "https://localhost:9000/file1.txt,/opt/spark/file2.txt",
MEMORY_OVERHEAD_FACTOR.key -> MEMORY_OVERHEAD_FACTOR.defaultValue.get.toString)
assert(additionalProperties === expectedSparkConf)
}

View file

@ -27,6 +27,7 @@ import org.scalatest.concurrent.Eventually
import org.apache.spark.deploy.k8s.integrationtest.TestConstants._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.JARS
import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.internal.config.UI.UI_ENABLED
@ -86,7 +87,7 @@ private[spark] class SparkAppConf {
def get(key: String): String = map.getOrElse(key, "")
def setJars(jars: Seq[String]): Unit = set("spark.jars", jars.mkString(","))
def setJars(jars: Seq[String]): Unit = set(JARS.key, jars.mkString(","))
override def toString: String = map.toString

View file

@ -25,6 +25,7 @@ import org.apache.spark.deploy.mesos.config._
import org.apache.spark.deploy.mesos.ui.MesosClusterUI
import org.apache.spark.deploy.rest.mesos.MesosRestServer
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Deploy._
import org.apache.spark.scheduler.cluster.mesos._
import org.apache.spark.util.{CommandLineUtils, ShutdownHookManager, SparkUncaughtExceptionHandler, Utils}

View file

@ -63,11 +63,6 @@ package object config {
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("30s")
private[spark] val RECOVERY_MODE =
ConfigBuilder("spark.deploy.recoveryMode")
.stringConf
.createWithDefault("NONE")
private[spark] val DISPATCHER_WEBUI_URL =
ConfigBuilder("spark.mesos.dispatcher.webui.url")
.doc("Set the Spark Mesos dispatcher webui_url for interacting with the " +
@ -75,13 +70,6 @@ package object config {
.stringConf
.createOptional
private[spark] val ZOOKEEPER_URL =
ConfigBuilder("spark.deploy.zookeeper.url")
.doc("When `spark.deploy.recoveryMode` is set to ZOOKEEPER, this " +
"configuration is used to set the zookeeper URL to connect to.")
.stringConf
.createOptional
private[spark] val HISTORY_SERVER_URL =
ConfigBuilder("spark.mesos.dispatcher.historyServer.url")
.doc("Set the URL of the history server. The dispatcher will then " +

View file

@ -26,6 +26,7 @@ import org.apache.zookeeper.KeeperException.NoNodeException
import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkCuratorUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Deploy._
import org.apache.spark.util.Utils
/**
@ -94,13 +95,13 @@ private[spark] class ZookeeperMesosClusterPersistenceEngine(
zk: CuratorFramework,
conf: SparkConf)
extends MesosClusterPersistenceEngine with Logging {
private val WORKING_DIR =
conf.get("spark.deploy.zookeeper.dir", "/spark_mesos_dispatcher") + "/" + baseDir
private val workingDir =
conf.get(ZOOKEEPER_DIRECTORY).getOrElse("/spark_mesos_dispatcher") + "/" + baseDir
SparkCuratorUtil.mkdir(zk, WORKING_DIR)
SparkCuratorUtil.mkdir(zk, workingDir)
def path(name: String): String = {
WORKING_DIR + "/" + name
workingDir + "/" + name
}
override def expunge(name: String): Unit = {
@ -129,6 +130,6 @@ private[spark] class ZookeeperMesosClusterPersistenceEngine(
}
override def fetchAll[T](): Iterable[T] = {
zk.getChildren.forPath(WORKING_DIR).asScala.flatMap(fetch[T])
zk.getChildren.forPath(workingDir).asScala.flatMap(fetch[T])
}
}

View file

@ -32,7 +32,7 @@ import org.apache.mesos.Protos.TaskStatus.Reason
import org.apache.spark.{SecurityManager, SparkConf, SparkException, TaskState}
import org.apache.spark.deploy.mesos.{config, MesosDriverDescription}
import org.apache.spark.deploy.rest.{CreateSubmissionResponse, KillSubmissionResponse, SubmissionStatusResponse}
import org.apache.spark.internal.config.{CORES_MAX, EXECUTOR_LIBRARY_PATH, EXECUTOR_MEMORY}
import org.apache.spark.internal.config._
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.Utils
@ -432,7 +432,7 @@ private[spark] class MesosClusterScheduler(
private def getDriverUris(desc: MesosDriverDescription): List[CommandInfo.URI] = {
val confUris = List(conf.getOption("spark.mesos.uris"),
desc.conf.getOption("spark.mesos.uris"),
desc.conf.getOption("spark.submit.pyFiles")).flatMap(
Some(desc.conf.get(SUBMIT_PYTHON_FILES).mkString(","))).flatMap(
_.map(_.split(",").map(_.trim))
).flatten
@ -534,16 +534,16 @@ private[spark] class MesosClusterScheduler(
desc.conf.getOption(CORES_MAX.key).foreach { v =>
options ++= Seq("--total-executor-cores", v)
}
desc.conf.getOption("spark.submit.pyFiles").foreach { pyFiles =>
val formattedFiles = pyFiles.split(",")
.map { path => new File(sandboxPath, path.split("/").last).toString() }
.mkString(",")
options ++= Seq("--py-files", formattedFiles)
}
val pyFiles = desc.conf.get(SUBMIT_PYTHON_FILES)
val formattedFiles = pyFiles.map { path =>
new File(sandboxPath, path.split("/").last).toString()
}.mkString(",")
options ++= Seq("--py-files", formattedFiles)
// --conf
val replicatedOptionsBlacklist = Set(
"spark.jars", // Avoids duplicate classes in classpath
JARS.key, // Avoids duplicate classes in classpath
"spark.submit.deployMode", // this would be set to `cluster`, but we need client
"spark.master" // this contains the address of the dispatcher, not master
)

View file

@ -68,7 +68,7 @@ private[spark] class Client(
private val yarnClient = YarnClient.createYarnClient
private val hadoopConf = new YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf))
private val isClusterMode = sparkConf.get("spark.submit.deployMode", "client") == "cluster"
private val isClusterMode = sparkConf.get(SUBMIT_DEPLOY_MODE) == "cluster"
// AM related configurations
private val amMemory = if (isClusterMode) {
@ -1532,8 +1532,8 @@ private[spark] class YarnClusterApplication extends SparkApplication {
override def start(args: Array[String], conf: SparkConf): Unit = {
// SparkSubmit would use yarn cache to distribute files & jars in yarn mode,
// so remove them from sparkConf here for yarn mode.
conf.remove("spark.jars")
conf.remove("spark.files")
conf.remove(JARS)
conf.remove(FILES)
new Client(new ClientArguments(args), conf).run()
}

View file

@ -40,6 +40,7 @@ import org.scalatest.Matchers
import org.apache.spark.{SparkConf, SparkFunSuite, TestUtils}
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.config._
import org.apache.spark.util.{SparkConfWithEnv, Utils}
class ClientSuite extends SparkFunSuite with Matchers {
@ -368,7 +369,7 @@ class ClientSuite extends SparkFunSuite with Matchers {
val resources = Map("fpga" -> 2, "gpu" -> 3)
ResourceRequestTestHelper.initializeResourceTypes(resources.keys.toSeq)
val conf = new SparkConf().set("spark.submit.deployMode", deployMode)
val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, deployMode)
resources.foreach { case (name, v) =>
conf.set(prefix + name, v.toString)
}

View file

@ -443,7 +443,7 @@ private object YarnClusterDriver extends Logging with Matchers {
// If we are running in yarn-cluster mode, verify that driver logs links and present and are
// in the expected format.
if (conf.get("spark.submit.deployMode") == "cluster") {
if (conf.get(SUBMIT_DEPLOY_MODE) == "cluster") {
assert(listener.driverLogs.nonEmpty)
val driverLogs = listener.driverLogs.get
assert(driverLogs.size === 2)

View file

@ -21,6 +21,7 @@ import com.esotericsoftware.kryo.{Kryo, Serializer}
import com.esotericsoftware.kryo.io.{Input, Output}
import org.apache.spark.SparkConf
import org.apache.spark.internal.config.Kryo._
import org.apache.spark.serializer.KryoRegistrator
import org.apache.spark.sql.test.SharedSQLContext
@ -33,7 +34,7 @@ class DatasetSerializerRegistratorSuite extends QueryTest with SharedSQLContext
override protected def sparkConf: SparkConf = {
// Make sure we use the KryoRegistrator
super.sparkConf.set("spark.kryo.registrator", TestRegistrator().getClass.getCanonicalName)
super.sparkConf.set(KRYO_USER_REGISTRATORS, TestRegistrator().getClass.getCanonicalName)
}
test("Kryo registrator") {

View file

@ -45,8 +45,8 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark extends BenchmarkBase {
private val conf = new SparkConf(false)
// Make the Java serializer write a reset instruction (TC_RESET) after each object to test
// for a bug we had with bytes written past the last object in a batch (SPARK-2792)
.set("spark.serializer.objectStreamReset", "1")
.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
.set(config.SERIALIZER_OBJECT_STREAM_RESET, 1)
.set(config.SERIALIZER, "org.apache.spark.serializer.JavaSerializer")
private def withFakeTaskContext(f: => Unit): Unit = {
val sc = new SparkContext("local", "test", conf)

View file

@ -22,7 +22,8 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream,
import scala.util.Random
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.internal.config.MEMORY_OFFHEAP_ENABLED
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Kryo._
import org.apache.spark.memory.{TaskMemoryManager, UnifiedMemoryManager}
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.sql.catalyst.InternalRow
@ -309,7 +310,7 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext {
test("Spark-14521") {
val ser = new KryoSerializer(
(new SparkConf).set("spark.kryo.referenceTracking", "false")).newInstance()
(new SparkConf).set(KRYO_REFERENCE_TRACKING, false)).newInstance()
val key = Seq(BoundReference(0, LongType, false))
// Testing Kryo serialization of HashedRelation