[SPARK-26482][CORE] Use ConfigEntry for hardcoded configs for ui categories
## What changes were proposed in this pull request? The PR makes hardcoded configs below to use `ConfigEntry`. * spark.ui * spark.ssl * spark.authenticate * spark.master.rest * spark.master.ui * spark.metrics * spark.admin * spark.modify.acl This patch doesn't change configs which are not relevant to SparkConf (e.g. system properties). ## How was this patch tested? Existing tests. Closes #23423 from HeartSaVioR/SPARK-26466. Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
This commit is contained in:
parent
51a6ba0181
commit
d9e4cf67c0
|
@ -29,6 +29,7 @@ import org.apache.hadoop.security.{Credentials, UserGroupInformation}
|
|||
import org.apache.spark.deploy.SparkHadoopUtil
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.internal.config._
|
||||
import org.apache.spark.internal.config.UI._
|
||||
import org.apache.spark.launcher.SparkLauncher
|
||||
import org.apache.spark.network.sasl.SecretKeyHolder
|
||||
import org.apache.spark.util.Utils
|
||||
|
@ -56,17 +57,13 @@ private[spark] class SecurityManager(
|
|||
private val WILDCARD_ACL = "*"
|
||||
|
||||
private val authOn = sparkConf.get(NETWORK_AUTH_ENABLED)
|
||||
// keep spark.ui.acls.enable for backwards compatibility with 1.0
|
||||
private var aclsOn =
|
||||
sparkConf.getBoolean("spark.acls.enable", sparkConf.getBoolean("spark.ui.acls.enable", false))
|
||||
private var aclsOn = sparkConf.get(ACLS_ENABLE)
|
||||
|
||||
// admin acls should be set before view or modify acls
|
||||
private var adminAcls: Set[String] =
|
||||
stringToSet(sparkConf.get("spark.admin.acls", ""))
|
||||
private var adminAcls: Set[String] = sparkConf.get(ADMIN_ACLS).toSet
|
||||
|
||||
// admin group acls should be set before view or modify group acls
|
||||
private var adminAclsGroups : Set[String] =
|
||||
stringToSet(sparkConf.get("spark.admin.acls.groups", ""))
|
||||
private var adminAclsGroups: Set[String] = sparkConf.get(ADMIN_ACLS_GROUPS).toSet
|
||||
|
||||
private var viewAcls: Set[String] = _
|
||||
|
||||
|
@ -82,11 +79,11 @@ private[spark] class SecurityManager(
|
|||
private val defaultAclUsers = Set[String](System.getProperty("user.name", ""),
|
||||
Utils.getCurrentUserName())
|
||||
|
||||
setViewAcls(defaultAclUsers, sparkConf.get("spark.ui.view.acls", ""))
|
||||
setModifyAcls(defaultAclUsers, sparkConf.get("spark.modify.acls", ""))
|
||||
setViewAcls(defaultAclUsers, sparkConf.get(UI_VIEW_ACLS))
|
||||
setModifyAcls(defaultAclUsers, sparkConf.get(MODIFY_ACLS))
|
||||
|
||||
setViewAclsGroups(sparkConf.get("spark.ui.view.acls.groups", ""));
|
||||
setModifyAclsGroups(sparkConf.get("spark.modify.acls.groups", ""));
|
||||
setViewAclsGroups(sparkConf.get(UI_VIEW_ACLS_GROUPS))
|
||||
setModifyAclsGroups(sparkConf.get(MODIFY_ACLS_GROUPS))
|
||||
|
||||
private var secretKey: String = _
|
||||
logInfo("SecurityManager: authentication " + (if (authOn) "enabled" else "disabled") +
|
||||
|
@ -127,23 +124,16 @@ private[spark] class SecurityManager(
|
|||
opts
|
||||
}
|
||||
|
||||
/**
|
||||
* Split a comma separated String, filter out any empty items, and return a Set of strings
|
||||
*/
|
||||
private def stringToSet(list: String): Set[String] = {
|
||||
list.split(',').map(_.trim).filter(!_.isEmpty).toSet
|
||||
}
|
||||
|
||||
/**
|
||||
* Admin acls should be set before the view or modify acls. If you modify the admin
|
||||
* acls you should also set the view and modify acls again to pick up the changes.
|
||||
*/
|
||||
def setViewAcls(defaultUsers: Set[String], allowedUsers: String) {
|
||||
viewAcls = (adminAcls ++ defaultUsers ++ stringToSet(allowedUsers))
|
||||
def setViewAcls(defaultUsers: Set[String], allowedUsers: Seq[String]) {
|
||||
viewAcls = adminAcls ++ defaultUsers ++ allowedUsers
|
||||
logInfo("Changing view acls to: " + viewAcls.mkString(","))
|
||||
}
|
||||
|
||||
def setViewAcls(defaultUser: String, allowedUsers: String) {
|
||||
def setViewAcls(defaultUser: String, allowedUsers: Seq[String]) {
|
||||
setViewAcls(Set[String](defaultUser), allowedUsers)
|
||||
}
|
||||
|
||||
|
@ -151,8 +141,8 @@ private[spark] class SecurityManager(
|
|||
* Admin acls groups should be set before the view or modify acls groups. If you modify the admin
|
||||
* acls groups you should also set the view and modify acls groups again to pick up the changes.
|
||||
*/
|
||||
def setViewAclsGroups(allowedUserGroups: String) {
|
||||
viewAclsGroups = (adminAclsGroups ++ stringToSet(allowedUserGroups));
|
||||
def setViewAclsGroups(allowedUserGroups: Seq[String]) {
|
||||
viewAclsGroups = adminAclsGroups ++ allowedUserGroups
|
||||
logInfo("Changing view acls groups to: " + viewAclsGroups.mkString(","))
|
||||
}
|
||||
|
||||
|
@ -179,8 +169,8 @@ private[spark] class SecurityManager(
|
|||
* Admin acls should be set before the view or modify acls. If you modify the admin
|
||||
* acls you should also set the view and modify acls again to pick up the changes.
|
||||
*/
|
||||
def setModifyAcls(defaultUsers: Set[String], allowedUsers: String) {
|
||||
modifyAcls = (adminAcls ++ defaultUsers ++ stringToSet(allowedUsers))
|
||||
def setModifyAcls(defaultUsers: Set[String], allowedUsers: Seq[String]) {
|
||||
modifyAcls = adminAcls ++ defaultUsers ++ allowedUsers
|
||||
logInfo("Changing modify acls to: " + modifyAcls.mkString(","))
|
||||
}
|
||||
|
||||
|
@ -188,8 +178,8 @@ private[spark] class SecurityManager(
|
|||
* Admin acls groups should be set before the view or modify acls groups. If you modify the admin
|
||||
* acls groups you should also set the view and modify acls groups again to pick up the changes.
|
||||
*/
|
||||
def setModifyAclsGroups(allowedUserGroups: String) {
|
||||
modifyAclsGroups = (adminAclsGroups ++ stringToSet(allowedUserGroups));
|
||||
def setModifyAclsGroups(allowedUserGroups: Seq[String]) {
|
||||
modifyAclsGroups = adminAclsGroups ++ allowedUserGroups
|
||||
logInfo("Changing modify acls groups to: " + modifyAclsGroups.mkString(","))
|
||||
}
|
||||
|
||||
|
@ -216,8 +206,8 @@ private[spark] class SecurityManager(
|
|||
* Admin acls should be set before the view or modify acls. If you modify the admin
|
||||
* acls you should also set the view and modify acls again to pick up the changes.
|
||||
*/
|
||||
def setAdminAcls(adminUsers: String) {
|
||||
adminAcls = stringToSet(adminUsers)
|
||||
def setAdminAcls(adminUsers: Seq[String]) {
|
||||
adminAcls = adminUsers.toSet
|
||||
logInfo("Changing admin acls to: " + adminAcls.mkString(","))
|
||||
}
|
||||
|
||||
|
@ -225,8 +215,8 @@ private[spark] class SecurityManager(
|
|||
* Admin acls groups should be set before the view or modify acls groups. If you modify the admin
|
||||
* acls groups you should also set the view and modify acls groups again to pick up the changes.
|
||||
*/
|
||||
def setAdminAclsGroups(adminUserGroups: String) {
|
||||
adminAclsGroups = stringToSet(adminUserGroups)
|
||||
def setAdminAclsGroups(adminUserGroups: Seq[String]) {
|
||||
adminAclsGroups = adminUserGroups.toSet
|
||||
logInfo("Changing admin acls groups to: " + adminAclsGroups.mkString(","))
|
||||
}
|
||||
|
||||
|
@ -416,7 +406,7 @@ private[spark] object SecurityManager {
|
|||
|
||||
val k8sRegex = "k8s.*".r
|
||||
val SPARK_AUTH_CONF = NETWORK_AUTH_ENABLED.key
|
||||
val SPARK_AUTH_SECRET_CONF = "spark.authenticate.secret"
|
||||
val SPARK_AUTH_SECRET_CONF = AUTH_SECRET.key
|
||||
// This is used to set auth secret to an executor's env variable. It should have the same
|
||||
// value as SPARK_AUTH_SECRET_CONF set in SparkConf
|
||||
val ENV_AUTH_SECRET = "_SPARK_AUTH_SECRET"
|
||||
|
|
|
@ -46,6 +46,7 @@ import org.apache.spark.input.{FixedLengthBinaryInputFormat, PortableDataStream,
|
|||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.internal.config._
|
||||
import org.apache.spark.internal.config.Tests._
|
||||
import org.apache.spark.internal.config.UI._
|
||||
import org.apache.spark.io.CompressionCodec
|
||||
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
|
||||
import org.apache.spark.rdd._
|
||||
|
@ -440,7 +441,7 @@ class SparkContext(config: SparkConf) extends Logging {
|
|||
}
|
||||
|
||||
_ui =
|
||||
if (conf.getBoolean("spark.ui.enabled", true)) {
|
||||
if (conf.get(UI_ENABLED)) {
|
||||
Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "",
|
||||
startTime))
|
||||
} else {
|
||||
|
@ -510,7 +511,7 @@ class SparkContext(config: SparkConf) extends Logging {
|
|||
_applicationId = _taskScheduler.applicationId()
|
||||
_applicationAttemptId = taskScheduler.applicationAttemptId()
|
||||
_conf.set("spark.app.id", _applicationId)
|
||||
if (_conf.getBoolean("spark.ui.reverseProxy", false)) {
|
||||
if (_conf.get(UI_REVERSE_PROXY)) {
|
||||
System.setProperty("spark.ui.proxyBase", "/proxy/" + _applicationId)
|
||||
}
|
||||
_ui.foreach(_.setAppId(_applicationId))
|
||||
|
|
|
@ -51,8 +51,8 @@ class LocalSparkCluster(
|
|||
|
||||
// Disable REST server on Master in this mode unless otherwise specified
|
||||
val _conf = conf.clone()
|
||||
.setIfMissing("spark.master.rest.enabled", "false")
|
||||
.set(config.SHUFFLE_SERVICE_ENABLED.key, "false")
|
||||
.setIfMissing(config.MASTER_REST_SERVER_ENABLED, false)
|
||||
.set(config.SHUFFLE_SERVICE_ENABLED, false)
|
||||
|
||||
/* Start the Master */
|
||||
val (rpcEnv, webUiPort, _) = Master.startRpcEnvAndEndpoint(localHostname, 0, 0, _conf)
|
||||
|
|
|
@ -50,6 +50,7 @@ import org.apache.spark.api.r.RUtils
|
|||
import org.apache.spark.deploy.rest._
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.internal.config._
|
||||
import org.apache.spark.internal.config.UI._
|
||||
import org.apache.spark.launcher.SparkLauncher
|
||||
import org.apache.spark.util._
|
||||
|
||||
|
|
|
@ -42,10 +42,11 @@ import org.fusesource.leveldbjni.internal.NativeDB
|
|||
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
|
||||
import org.apache.spark.deploy.SparkHadoopUtil
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.internal.config.{DRIVER_LOG_DFS_DIR, History}
|
||||
import org.apache.spark.internal.config._
|
||||
import org.apache.spark.internal.config.History._
|
||||
import org.apache.spark.internal.config.Status._
|
||||
import org.apache.spark.internal.config.Tests.IS_TESTING
|
||||
import org.apache.spark.internal.config.UI._
|
||||
import org.apache.spark.io.CompressionCodec
|
||||
import org.apache.spark.scheduler._
|
||||
import org.apache.spark.scheduler.ReplayListenerBus._
|
||||
|
@ -105,12 +106,12 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
|
|||
|
||||
private val logDir = conf.get(History.HISTORY_LOG_DIR)
|
||||
|
||||
private val HISTORY_UI_ACLS_ENABLE = conf.get(History.UI_ACLS_ENABLE)
|
||||
private val HISTORY_UI_ADMIN_ACLS = conf.get(History.UI_ADMIN_ACLS)
|
||||
private val HISTORY_UI_ADMIN_ACLS_GROUPS = conf.get(History.UI_ADMIN_ACLS_GROUPS)
|
||||
logInfo(s"History server ui acls " + (if (HISTORY_UI_ACLS_ENABLE) "enabled" else "disabled") +
|
||||
"; users with admin permissions: " + HISTORY_UI_ADMIN_ACLS.toString +
|
||||
"; groups with admin permissions" + HISTORY_UI_ADMIN_ACLS_GROUPS.toString)
|
||||
private val historyUiAclsEnable = conf.get(History.HISTORY_SERVER_UI_ACLS_ENABLE)
|
||||
private val historyUiAdminAcls = conf.get(History.HISTORY_SERVER_UI_ADMIN_ACLS)
|
||||
private val historyUiAdminAclsGroups = conf.get(History.HISTORY_SERVER_UI_ADMIN_ACLS_GROUPS)
|
||||
logInfo(s"History server ui acls " + (if (historyUiAclsEnable) "enabled" else "disabled") +
|
||||
"; users with admin permissions: " + historyUiAdminAcls.mkString(",") +
|
||||
"; groups with admin permissions" + historyUiAdminAclsGroups.mkString(","))
|
||||
|
||||
private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
|
||||
// Visible for testing
|
||||
|
@ -314,6 +315,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
|
|||
|
||||
override def getLastUpdatedTime(): Long = lastScanTime.get()
|
||||
|
||||
/**
|
||||
* Split a comma separated String, filter out any empty items, and return a Sequence of strings
|
||||
*/
|
||||
private def stringToSeq(list: String): Seq[String] = {
|
||||
list.split(',').map(_.trim).filter(!_.isEmpty)
|
||||
}
|
||||
|
||||
override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = {
|
||||
val app = try {
|
||||
load(appId)
|
||||
|
@ -330,13 +338,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
|
|||
val conf = this.conf.clone()
|
||||
val secManager = new SecurityManager(conf)
|
||||
|
||||
secManager.setAcls(HISTORY_UI_ACLS_ENABLE)
|
||||
secManager.setAcls(historyUiAclsEnable)
|
||||
// make sure to set admin acls before view acls so they are properly picked up
|
||||
secManager.setAdminAcls(HISTORY_UI_ADMIN_ACLS + "," + attempt.adminAcls.getOrElse(""))
|
||||
secManager.setViewAcls(attempt.info.sparkUser, attempt.viewAcls.getOrElse(""))
|
||||
secManager.setAdminAclsGroups(HISTORY_UI_ADMIN_ACLS_GROUPS + "," +
|
||||
attempt.adminAclsGroups.getOrElse(""))
|
||||
secManager.setViewAclsGroups(attempt.viewAclsGroups.getOrElse(""))
|
||||
secManager.setAdminAcls(historyUiAdminAcls ++ stringToSeq(attempt.adminAcls.getOrElse("")))
|
||||
secManager.setViewAcls(attempt.info.sparkUser, stringToSeq(attempt.viewAcls.getOrElse("")))
|
||||
secManager.setAdminAclsGroups(historyUiAdminAclsGroups ++
|
||||
stringToSeq(attempt.adminAclsGroups.getOrElse("")))
|
||||
secManager.setViewAclsGroups(stringToSeq(attempt.viewAclsGroups.getOrElse("")))
|
||||
|
||||
val kvstore = try {
|
||||
diskManager match {
|
||||
|
@ -1187,11 +1195,16 @@ private[history] class AppListingListener(
|
|||
// Only parse the first env update, since any future changes don't have any effect on
|
||||
// the ACLs set for the UI.
|
||||
if (!gotEnvUpdate) {
|
||||
def emptyStringToNone(strOption: Option[String]): Option[String] = strOption match {
|
||||
case Some("") => None
|
||||
case _ => strOption
|
||||
}
|
||||
|
||||
val allProperties = event.environmentDetails("Spark Properties").toMap
|
||||
attempt.viewAcls = allProperties.get("spark.ui.view.acls")
|
||||
attempt.adminAcls = allProperties.get("spark.admin.acls")
|
||||
attempt.viewAclsGroups = allProperties.get("spark.ui.view.acls.groups")
|
||||
attempt.adminAclsGroups = allProperties.get("spark.admin.acls.groups")
|
||||
attempt.viewAcls = emptyStringToNone(allProperties.get(UI_VIEW_ACLS.key))
|
||||
attempt.adminAcls = emptyStringToNone(allProperties.get(ADMIN_ACLS.key))
|
||||
attempt.viewAclsGroups = emptyStringToNone(allProperties.get(UI_VIEW_ACLS_GROUPS.key))
|
||||
attempt.adminAclsGroups = emptyStringToNone(allProperties.get(ADMIN_ACLS_GROUPS.key))
|
||||
|
||||
gotEnvUpdate = true
|
||||
checkProgress()
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
|
|||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.internal.config._
|
||||
import org.apache.spark.internal.config.History
|
||||
import org.apache.spark.internal.config.UI._
|
||||
import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationInfo, UIRoot}
|
||||
import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
|
||||
import org.apache.spark.ui.JettyUtils._
|
||||
|
@ -302,11 +303,10 @@ object HistoryServer extends Logging {
|
|||
config.set(SecurityManager.SPARK_AUTH_CONF, "false")
|
||||
}
|
||||
|
||||
if (config.getBoolean("spark.acls.enable", config.getBoolean("spark.ui.acls.enable", false))) {
|
||||
logInfo("Either spark.acls.enable or spark.ui.acls.enable is configured, clearing it and " +
|
||||
"only using spark.history.ui.acl.enable")
|
||||
config.set("spark.acls.enable", "false")
|
||||
config.set("spark.ui.acls.enable", "false")
|
||||
if (config.get(ACLS_ENABLE)) {
|
||||
logInfo(s"${ACLS_ENABLE.key} is configured, " +
|
||||
s"clearing it and only using ${History.HISTORY_SERVER_UI_ACLS_ENABLE.key}")
|
||||
config.set(ACLS_ENABLE, false)
|
||||
}
|
||||
|
||||
new SecurityManager(config)
|
||||
|
|
|
@ -33,6 +33,8 @@ import org.apache.spark.deploy.master.MasterMessages._
|
|||
import org.apache.spark.deploy.master.ui.MasterWebUI
|
||||
import org.apache.spark.deploy.rest.StandaloneRestServer
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.internal.config._
|
||||
import org.apache.spark.internal.config.UI._
|
||||
import org.apache.spark.metrics.MetricsSystem
|
||||
import org.apache.spark.rpc._
|
||||
import org.apache.spark.serializer.{JavaSerializer, Serializer}
|
||||
|
@ -115,13 +117,13 @@ private[deploy] class Master(
|
|||
|
||||
// Default maxCores for applications that don't specify it (i.e. pass Int.MaxValue)
|
||||
private val defaultCores = conf.getInt("spark.deploy.defaultCores", Int.MaxValue)
|
||||
val reverseProxy = conf.getBoolean("spark.ui.reverseProxy", false)
|
||||
val reverseProxy = conf.get(UI_REVERSE_PROXY)
|
||||
if (defaultCores < 1) {
|
||||
throw new SparkException("spark.deploy.defaultCores must be positive")
|
||||
}
|
||||
|
||||
// Alternative application submission gateway that is stable across Spark versions
|
||||
private val restServerEnabled = conf.getBoolean("spark.master.rest.enabled", false)
|
||||
private val restServerEnabled = conf.get(MASTER_REST_SERVER_ENABLED)
|
||||
private var restServer: Option[StandaloneRestServer] = None
|
||||
private var restServerBoundPort: Option[Int] = None
|
||||
|
||||
|
@ -140,7 +142,7 @@ private[deploy] class Master(
|
|||
webUi.bind()
|
||||
masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort
|
||||
if (reverseProxy) {
|
||||
masterWebUiUrl = conf.get("spark.ui.reverseProxyUrl", masterWebUiUrl)
|
||||
masterWebUiUrl = conf.get(UI_REVERSE_PROXY_URL).orElse(Some(masterWebUiUrl)).get
|
||||
webUi.addProxy()
|
||||
logInfo(s"Spark Master is acting as a reverse proxy. Master, Workers and " +
|
||||
s"Applications UIs are available at $masterWebUiUrl")
|
||||
|
@ -152,7 +154,7 @@ private[deploy] class Master(
|
|||
}, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
|
||||
|
||||
if (restServerEnabled) {
|
||||
val port = conf.getInt("spark.master.rest.port", 6066)
|
||||
val port = conf.get(MASTER_REST_SERVER_PORT)
|
||||
restServer = Some(new StandaloneRestServer(address.host, port, conf, self, masterUrl))
|
||||
}
|
||||
restServerBoundPort = restServer.map(_.start())
|
||||
|
|
|
@ -21,6 +21,7 @@ import scala.annotation.tailrec
|
|||
|
||||
import org.apache.spark.SparkConf
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.internal.config.MASTER_UI_PORT
|
||||
import org.apache.spark.util.{IntParam, Utils}
|
||||
|
||||
/**
|
||||
|
@ -53,8 +54,8 @@ private[master] class MasterArguments(args: Array[String], conf: SparkConf) exte
|
|||
// This mutates the SparkConf, so all accesses to it must be made after this line
|
||||
propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)
|
||||
|
||||
if (conf.contains("spark.master.ui.port")) {
|
||||
webUiPort = conf.get("spark.master.ui.port").toInt
|
||||
if (conf.contains(MASTER_UI_PORT.key)) {
|
||||
webUiPort = conf.get(MASTER_UI_PORT)
|
||||
}
|
||||
|
||||
@tailrec
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.spark.deploy.master.ui
|
|||
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
|
||||
import org.apache.spark.deploy.master.Master
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.internal.config.UI.UI_KILL_ENABLED
|
||||
import org.apache.spark.ui.{SparkUI, WebUI}
|
||||
import org.apache.spark.ui.JettyUtils._
|
||||
|
||||
|
@ -34,7 +35,7 @@ class MasterWebUI(
|
|||
requestedPort, master.conf, name = "MasterUI") with Logging {
|
||||
|
||||
val masterEndpointRef = master.self
|
||||
val killEnabled = master.conf.getBoolean("spark.ui.killEnabled", true)
|
||||
val killEnabled = master.conf.get(UI_KILL_ENABLED)
|
||||
|
||||
initialize()
|
||||
|
||||
|
|
|
@ -146,7 +146,7 @@ private[rest] class StandaloneSubmitRequestServlet(
|
|||
// the driver.
|
||||
val masters = sparkProperties.get("spark.master")
|
||||
val (_, masterPort) = Utils.extractHostPortFromSparkUrl(masterUrl)
|
||||
val masterRestPort = this.conf.getInt("spark.master.rest.port", 6066)
|
||||
val masterRestPort = this.conf.get(config.MASTER_REST_SERVER_PORT)
|
||||
val updatedMasters = masters.map(
|
||||
_.replace(s":$masterRestPort", s":$masterPort")).getOrElse(masterUrl)
|
||||
val appArgs = request.appArgs
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.spark.{SecurityManager, SparkConf}
|
|||
import org.apache.spark.deploy.{ApplicationDescription, Command, ExecutorState}
|
||||
import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.internal.config.UI._
|
||||
import org.apache.spark.rpc.RpcEndpointRef
|
||||
import org.apache.spark.util.{ShutdownHookManager, Utils}
|
||||
import org.apache.spark.util.logging.FileAppender
|
||||
|
@ -160,7 +161,7 @@ private[deploy] class ExecutorRunner(
|
|||
|
||||
// Add webUI log urls
|
||||
val baseUrl =
|
||||
if (conf.getBoolean("spark.ui.reverseProxy", false)) {
|
||||
if (conf.get(UI_REVERSE_PROXY)) {
|
||||
s"/proxy/$workerId/logPage/?appId=$appId&executorId=$execId&logType="
|
||||
} else {
|
||||
s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
|
||||
|
|
|
@ -38,6 +38,7 @@ import org.apache.spark.deploy.master.{DriverState, Master}
|
|||
import org.apache.spark.deploy.worker.ui.WorkerWebUI
|
||||
import org.apache.spark.internal.{config, Logging}
|
||||
import org.apache.spark.internal.config.Tests.IS_TESTING
|
||||
import org.apache.spark.internal.config.UI._
|
||||
import org.apache.spark.metrics.MetricsSystem
|
||||
import org.apache.spark.rpc._
|
||||
import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, Utils}
|
||||
|
@ -165,7 +166,7 @@ private[deploy] class Worker(
|
|||
private val metricsSystem = MetricsSystem.createMetricsSystem("worker", conf, securityMgr)
|
||||
private val workerSource = new WorkerSource(this)
|
||||
|
||||
val reverseProxy = conf.getBoolean("spark.ui.reverseProxy", false)
|
||||
val reverseProxy = conf.get(UI_REVERSE_PROXY)
|
||||
|
||||
private var registerMasterFutures: Array[JFuture[_]] = null
|
||||
private var registrationRetryTimer: Option[JScheduledFuture[_]] = None
|
||||
|
|
|
@ -88,17 +88,19 @@ private[spark] object History {
|
|||
val MAX_DRIVER_LOG_AGE_S = ConfigBuilder("spark.history.fs.driverlog.cleaner.maxAge")
|
||||
.fallbackConf(MAX_LOG_AGE_S)
|
||||
|
||||
val UI_ACLS_ENABLE = ConfigBuilder("spark.history.ui.acls.enable")
|
||||
val HISTORY_SERVER_UI_ACLS_ENABLE = ConfigBuilder("spark.history.ui.acls.enable")
|
||||
.booleanConf
|
||||
.createWithDefault(false)
|
||||
|
||||
val UI_ADMIN_ACLS = ConfigBuilder("spark.history.ui.admin.acls")
|
||||
val HISTORY_SERVER_UI_ADMIN_ACLS = ConfigBuilder("spark.history.ui.admin.acls")
|
||||
.stringConf
|
||||
.createWithDefault("")
|
||||
.toSequence
|
||||
.createWithDefault(Nil)
|
||||
|
||||
val UI_ADMIN_ACLS_GROUPS = ConfigBuilder("spark.history.ui.admin.acls.groups")
|
||||
val HISTORY_SERVER_UI_ADMIN_ACLS_GROUPS = ConfigBuilder("spark.history.ui.admin.acls.groups")
|
||||
.stringConf
|
||||
.createWithDefault("")
|
||||
.toSequence
|
||||
.createWithDefault(Nil)
|
||||
|
||||
val NUM_REPLAY_THREADS = ConfigBuilder("spark.history.fs.numReplayThreads")
|
||||
.intConf
|
||||
|
|
145
core/src/main/scala/org/apache/spark/internal/config/UI.scala
Normal file
145
core/src/main/scala/org/apache/spark/internal/config/UI.scala
Normal file
|
@ -0,0 +1,145 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.internal.config
|
||||
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import org.apache.spark.network.util.ByteUnit
|
||||
|
||||
private[spark] object UI {
|
||||
|
||||
val UI_SHOW_CONSOLE_PROGRESS = ConfigBuilder("spark.ui.showConsoleProgress")
|
||||
.doc("When true, show the progress bar in the console.")
|
||||
.booleanConf
|
||||
.createWithDefault(false)
|
||||
|
||||
val UI_CONSOLE_PROGRESS_UPDATE_INTERVAL =
|
||||
ConfigBuilder("spark.ui.consoleProgress.update.interval")
|
||||
.timeConf(TimeUnit.MILLISECONDS)
|
||||
.createWithDefault(200)
|
||||
|
||||
val UI_ENABLED = ConfigBuilder("spark.ui.enabled")
|
||||
.doc("Whether to run the web UI for the Spark application.")
|
||||
.booleanConf
|
||||
.createWithDefault(true)
|
||||
|
||||
val UI_PORT = ConfigBuilder("spark.ui.port")
|
||||
.doc("Port for your application's dashboard, which shows memory and workload data.")
|
||||
.intConf
|
||||
.createWithDefault(4040)
|
||||
|
||||
val UI_FILTERS = ConfigBuilder("spark.ui.filters")
|
||||
.doc("Comma separated list of filter class names to apply to the Spark Web UI.")
|
||||
.stringConf
|
||||
.toSequence
|
||||
.createWithDefault(Nil)
|
||||
|
||||
val UI_ALLOW_FRAMING_FROM = ConfigBuilder("spark.ui.allowFramingFrom")
|
||||
.stringConf
|
||||
.createOptional
|
||||
|
||||
val UI_REVERSE_PROXY = ConfigBuilder("spark.ui.reverseProxy")
|
||||
.doc("Enable running Spark Master as reverse proxy for worker and application UIs. " +
|
||||
"In this mode, Spark master will reverse proxy the worker and application UIs to enable " +
|
||||
"access without requiring direct access to their hosts. Use it with caution, as worker " +
|
||||
"and application UI will not be accessible directly, you will only be able to access them" +
|
||||
"through spark master/proxy public URL. This setting affects all the workers and " +
|
||||
"application UIs running in the cluster and must be set on all the workers, drivers " +
|
||||
" and masters.")
|
||||
.booleanConf
|
||||
.createWithDefault(false)
|
||||
|
||||
val UI_REVERSE_PROXY_URL = ConfigBuilder("spark.ui.reverseProxyUrl")
|
||||
.doc("This is the URL where your proxy is running. This URL is for proxy which is running " +
|
||||
"in front of Spark Master. This is useful when running proxy for authentication e.g. " +
|
||||
"OAuth proxy. Make sure this is a complete URL including scheme (http/https) and port to " +
|
||||
"reach your proxy.")
|
||||
.stringConf
|
||||
.createOptional
|
||||
|
||||
val UI_KILL_ENABLED = ConfigBuilder("spark.ui.killEnabled")
|
||||
.doc("Allows jobs and stages to be killed from the web UI.")
|
||||
.booleanConf
|
||||
.createWithDefault(true)
|
||||
|
||||
val UI_THREAD_DUMPS_ENABLED = ConfigBuilder("spark.ui.threadDumpsEnabled")
|
||||
.booleanConf
|
||||
.createWithDefault(true)
|
||||
|
||||
val UI_X_XSS_PROTECTION = ConfigBuilder("spark.ui.xXssProtection")
|
||||
.doc("Value for HTTP X-XSS-Protection response header")
|
||||
.stringConf
|
||||
.createWithDefaultString("1; mode=block")
|
||||
|
||||
val UI_X_CONTENT_TYPE_OPTIONS = ConfigBuilder("spark.ui.xContentTypeOptions.enabled")
|
||||
.doc("Set to 'true' for setting X-Content-Type-Options HTTP response header to 'nosniff'")
|
||||
.booleanConf
|
||||
.createWithDefault(true)
|
||||
|
||||
val UI_STRICT_TRANSPORT_SECURITY = ConfigBuilder("spark.ui.strictTransportSecurity")
|
||||
.doc("Value for HTTP Strict Transport Security Response Header")
|
||||
.stringConf
|
||||
.createOptional
|
||||
|
||||
val UI_REQUEST_HEADER_SIZE = ConfigBuilder("spark.ui.requestHeaderSize")
|
||||
.doc("Value for HTTP request header size in bytes.")
|
||||
.bytesConf(ByteUnit.BYTE)
|
||||
.createWithDefaultString("8k")
|
||||
|
||||
val UI_TIMELINE_TASKS_MAXIMUM = ConfigBuilder("spark.ui.timeline.tasks.maximum")
|
||||
.intConf
|
||||
.createWithDefault(1000)
|
||||
|
||||
val ACLS_ENABLE = ConfigBuilder("spark.acls.enable")
|
||||
.booleanConf
|
||||
.createWithDefault(false)
|
||||
|
||||
val UI_VIEW_ACLS = ConfigBuilder("spark.ui.view.acls")
|
||||
.stringConf
|
||||
.toSequence
|
||||
.createWithDefault(Nil)
|
||||
|
||||
val UI_VIEW_ACLS_GROUPS = ConfigBuilder("spark.ui.view.acls.groups")
|
||||
.stringConf
|
||||
.toSequence
|
||||
.createWithDefault(Nil)
|
||||
|
||||
val ADMIN_ACLS = ConfigBuilder("spark.admin.acls")
|
||||
.stringConf
|
||||
.toSequence
|
||||
.createWithDefault(Nil)
|
||||
|
||||
val ADMIN_ACLS_GROUPS = ConfigBuilder("spark.admin.acls.groups")
|
||||
.stringConf
|
||||
.toSequence
|
||||
.createWithDefault(Nil)
|
||||
|
||||
val MODIFY_ACLS = ConfigBuilder("spark.modify.acls")
|
||||
.stringConf
|
||||
.toSequence
|
||||
.createWithDefault(Nil)
|
||||
|
||||
val MODIFY_ACLS_GROUPS = ConfigBuilder("spark.modify.acls.groups")
|
||||
.stringConf
|
||||
.toSequence
|
||||
.createWithDefault(Nil)
|
||||
|
||||
val USER_GROUPS_MAPPING = ConfigBuilder("spark.user.groups.mapping")
|
||||
.stringConf
|
||||
.createWithDefault("org.apache.spark.security.ShellBasedGroupsMappingProvider")
|
||||
}
|
|
@ -326,6 +326,10 @@ package object config {
|
|||
.stringConf
|
||||
.createOptional
|
||||
|
||||
private[spark] val METRICS_CONF = ConfigBuilder("spark.metrics.conf")
|
||||
.stringConf
|
||||
.createOptional
|
||||
|
||||
private[spark] val PYSPARK_DRIVER_PYTHON = ConfigBuilder("spark.pyspark.driver.python")
|
||||
.stringConf
|
||||
.createOptional
|
||||
|
@ -338,11 +342,6 @@ package object config {
|
|||
private[spark] val HISTORY_UI_MAX_APPS =
|
||||
ConfigBuilder("spark.history.ui.maxApplications").intConf.createWithDefault(Integer.MAX_VALUE)
|
||||
|
||||
private[spark] val UI_SHOW_CONSOLE_PROGRESS = ConfigBuilder("spark.ui.showConsoleProgress")
|
||||
.doc("When true, show the progress bar in the console.")
|
||||
.booleanConf
|
||||
.createWithDefault(false)
|
||||
|
||||
private[spark] val IO_ENCRYPTION_ENABLED = ConfigBuilder("spark.io.encryption.enabled")
|
||||
.booleanConf
|
||||
.createWithDefault(false)
|
||||
|
@ -446,6 +445,11 @@ package object config {
|
|||
.regexConf
|
||||
.createOptional
|
||||
|
||||
private[spark] val AUTH_SECRET =
|
||||
ConfigBuilder("spark.authenticate.secret")
|
||||
.stringConf
|
||||
.createOptional
|
||||
|
||||
private[spark] val AUTH_SECRET_BIT_LENGTH =
|
||||
ConfigBuilder("spark.authenticate.secretBitLength")
|
||||
.intConf
|
||||
|
@ -625,30 +629,6 @@ package object config {
|
|||
.toSequence
|
||||
.createWithDefault(Nil)
|
||||
|
||||
private[spark] val UI_X_XSS_PROTECTION =
|
||||
ConfigBuilder("spark.ui.xXssProtection")
|
||||
.doc("Value for HTTP X-XSS-Protection response header")
|
||||
.stringConf
|
||||
.createWithDefaultString("1; mode=block")
|
||||
|
||||
private[spark] val UI_X_CONTENT_TYPE_OPTIONS =
|
||||
ConfigBuilder("spark.ui.xContentTypeOptions.enabled")
|
||||
.doc("Set to 'true' for setting X-Content-Type-Options HTTP response header to 'nosniff'")
|
||||
.booleanConf
|
||||
.createWithDefault(true)
|
||||
|
||||
private[spark] val UI_STRICT_TRANSPORT_SECURITY =
|
||||
ConfigBuilder("spark.ui.strictTransportSecurity")
|
||||
.doc("Value for HTTP Strict Transport Security Response Header")
|
||||
.stringConf
|
||||
.createOptional
|
||||
|
||||
private[spark] val UI_REQUEST_HEADER_SIZE =
|
||||
ConfigBuilder("spark.ui.requestHeaderSize")
|
||||
.doc("Value for HTTP request header size in bytes.")
|
||||
.bytesConf(ByteUnit.BYTE)
|
||||
.createWithDefaultString("8k")
|
||||
|
||||
private[spark] val EXTRA_LISTENERS = ConfigBuilder("spark.extraListeners")
|
||||
.doc("Class names of listeners to add to SparkContext during initialization.")
|
||||
.stringConf
|
||||
|
@ -780,4 +760,16 @@ package object config {
|
|||
ConfigBuilder("spark.executor.logs.rolling.enableCompression")
|
||||
.booleanConf
|
||||
.createWithDefault(false)
|
||||
|
||||
private[spark] val MASTER_REST_SERVER_ENABLED = ConfigBuilder("spark.master.rest.enabled")
|
||||
.booleanConf
|
||||
.createWithDefault(false)
|
||||
|
||||
private[spark] val MASTER_REST_SERVER_PORT = ConfigBuilder("spark.master.rest.port")
|
||||
.intConf
|
||||
.createWithDefault(6066)
|
||||
|
||||
private[spark] val MASTER_UI_PORT = ConfigBuilder("spark.master.ui.port")
|
||||
.intConf
|
||||
.createWithDefault(8080)
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import scala.util.matching.Regex
|
|||
|
||||
import org.apache.spark.SparkConf
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.internal.config.METRICS_CONF
|
||||
import org.apache.spark.util.Utils
|
||||
|
||||
private[spark] class MetricsConfig(conf: SparkConf) extends Logging {
|
||||
|
@ -52,7 +53,7 @@ private[spark] class MetricsConfig(conf: SparkConf) extends Logging {
|
|||
// Add default properties in case there's no properties file
|
||||
setDefaultProperties(properties)
|
||||
|
||||
loadPropertiesFromFile(conf.getOption("spark.metrics.conf"))
|
||||
loadPropertiesFromFile(conf.get(METRICS_CONF))
|
||||
|
||||
// Also look for the properties in provided Spark configuration
|
||||
val prefix = "spark.metrics.conf."
|
||||
|
|
|
@ -21,6 +21,7 @@ import java.util.{Timer, TimerTask}
|
|||
|
||||
import org.apache.spark._
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.internal.config.UI._
|
||||
import org.apache.spark.status.api.v1.StageData
|
||||
|
||||
/**
|
||||
|
@ -33,8 +34,7 @@ private[spark] class ConsoleProgressBar(sc: SparkContext) extends Logging {
|
|||
// Carriage return
|
||||
private val CR = '\r'
|
||||
// Update period of progress bar, in milliseconds
|
||||
private val updatePeriodMSec =
|
||||
sc.getConf.getTimeAsMs("spark.ui.consoleProgress.update.interval", "200")
|
||||
private val updatePeriodMSec = sc.getConf.get(UI_CONSOLE_PROGRESS_UPDATE_INTERVAL)
|
||||
// Delay to show up a progress bar, in milliseconds
|
||||
private val firstDelayMSec = 500L
|
||||
|
||||
|
|
|
@ -26,7 +26,7 @@ import scala.collection.JavaConverters._
|
|||
import org.apache.commons.lang3.StringEscapeUtils
|
||||
|
||||
import org.apache.spark.{SecurityManager, SparkConf}
|
||||
import org.apache.spark.internal.config._
|
||||
import org.apache.spark.internal.config.UI._
|
||||
|
||||
/**
|
||||
* A servlet filter that implements HTTP security features. The following actions are taken
|
||||
|
|
|
@ -40,7 +40,7 @@ import org.json4s.jackson.JsonMethods.{pretty, render}
|
|||
|
||||
import org.apache.spark.{SecurityManager, SparkConf, SSLOptions}
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.internal.config._
|
||||
import org.apache.spark.internal.config.UI._
|
||||
import org.apache.spark.util.Utils
|
||||
|
||||
/**
|
||||
|
@ -500,7 +500,7 @@ private[spark] case class ServerInfo(
|
|||
* of the chain to perform security-related functions.
|
||||
*/
|
||||
private def addFilters(handler: ServletContextHandler, securityMgr: SecurityManager): Unit = {
|
||||
conf.getOption("spark.ui.filters").toSeq.flatMap(Utils.stringToSeq).foreach { filter =>
|
||||
conf.get(UI_FILTERS).foreach { filter =>
|
||||
logInfo(s"Adding filter to ${handler.getContextPath()}: $filter")
|
||||
val oldParams = conf.getOption(s"spark.$filter.params").toSeq
|
||||
.flatMap(Utils.stringToSeq)
|
||||
|
|
|
@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
|
|||
|
||||
import org.apache.spark.{JobExecutionStatus, SecurityManager, SparkConf, SparkContext}
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.internal.config.UI._
|
||||
import org.apache.spark.scheduler._
|
||||
import org.apache.spark.status.AppStatusStore
|
||||
import org.apache.spark.status.api.v1._
|
||||
|
@ -50,7 +51,7 @@ private[spark] class SparkUI private (
|
|||
with Logging
|
||||
with UIRoot {
|
||||
|
||||
val killEnabled = sc.map(_.conf.getBoolean("spark.ui.killEnabled", true)).getOrElse(false)
|
||||
val killEnabled = sc.map(_.conf.get(UI_KILL_ENABLED)).getOrElse(false)
|
||||
|
||||
var appId: String = _
|
||||
|
||||
|
@ -151,12 +152,11 @@ private[spark] abstract class SparkUITab(parent: SparkUI, prefix: String)
|
|||
}
|
||||
|
||||
private[spark] object SparkUI {
|
||||
val DEFAULT_PORT = 4040
|
||||
val STATIC_RESOURCE_DIR = "org/apache/spark/ui/static"
|
||||
val DEFAULT_POOL_NAME = "default"
|
||||
|
||||
def getUIPort(conf: SparkConf): Int = {
|
||||
conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT)
|
||||
conf.get(UI_PORT)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -21,6 +21,7 @@ import javax.servlet.http.HttpServletRequest
|
|||
|
||||
import scala.xml.Node
|
||||
|
||||
import org.apache.spark.internal.config.UI._
|
||||
import org.apache.spark.ui.{SparkUI, SparkUITab, UIUtils, WebUIPage}
|
||||
|
||||
private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "executors") {
|
||||
|
@ -29,7 +30,7 @@ private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "exec
|
|||
|
||||
private def init(): Unit = {
|
||||
val threadDumpEnabled =
|
||||
parent.sc.isDefined && parent.conf.getBoolean("spark.ui.threadDumpsEnabled", true)
|
||||
parent.sc.isDefined && parent.conf.get(UI_THREAD_DUMPS_ENABLED)
|
||||
|
||||
attachPage(new ExecutorsPage(this, threadDumpEnabled))
|
||||
if (threadDumpEnabled) {
|
||||
|
|
|
@ -27,6 +27,7 @@ import scala.xml.{Node, Unparsed}
|
|||
|
||||
import org.apache.commons.lang3.StringEscapeUtils
|
||||
|
||||
import org.apache.spark.internal.config.UI._
|
||||
import org.apache.spark.scheduler.TaskLocality
|
||||
import org.apache.spark.status._
|
||||
import org.apache.spark.status.api.v1._
|
||||
|
@ -63,7 +64,7 @@ private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends We
|
|||
|
||||
// TODO: We should consider increasing the number of this parameter over time
|
||||
// if we find that it's okay.
|
||||
private val MAX_TIMELINE_TASKS = parent.conf.getInt("spark.ui.timeline.tasks.maximum", 1000)
|
||||
private val MAX_TIMELINE_TASKS = parent.conf.get(UI_TIMELINE_TASKS_MAXIMUM)
|
||||
|
||||
private def getLocalitySummaryString(localitySummary: Map[String, Long]): String = {
|
||||
val names = Map(
|
||||
|
|
|
@ -61,6 +61,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
|
|||
import org.apache.spark.internal.{config, Logging}
|
||||
import org.apache.spark.internal.config._
|
||||
import org.apache.spark.internal.config.Tests.IS_TESTING
|
||||
import org.apache.spark.internal.config.UI._
|
||||
import org.apache.spark.launcher.SparkLauncher
|
||||
import org.apache.spark.network.util.JavaUtils
|
||||
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
|
||||
|
@ -2387,8 +2388,7 @@ private[spark] object Utils extends Logging {
|
|||
|
||||
// Returns the groups to which the current user belongs.
|
||||
def getCurrentUserGroups(sparkConf: SparkConf, username: String): Set[String] = {
|
||||
val groupProviderClassName = sparkConf.get("spark.user.groups.mapping",
|
||||
"org.apache.spark.security.ShellBasedGroupsMappingProvider")
|
||||
val groupProviderClassName = sparkConf.get(USER_GROUPS_MAPPING)
|
||||
if (groupProviderClassName != "") {
|
||||
try {
|
||||
val groupMappingServiceProvider = classForName(groupProviderClassName).
|
||||
|
|
|
@ -24,6 +24,7 @@ import scala.reflect.ClassTag
|
|||
import com.google.common.io.ByteStreams
|
||||
import org.apache.hadoop.fs.Path
|
||||
|
||||
import org.apache.spark.internal.config.UI._
|
||||
import org.apache.spark.io.CompressionCodec
|
||||
import org.apache.spark.rdd._
|
||||
import org.apache.spark.storage.{BlockId, StorageLevel, TestBlockId}
|
||||
|
@ -589,7 +590,7 @@ class CheckpointCompressionSuite extends SparkFunSuite with LocalSparkContext {
|
|||
withTempDir { checkpointDir =>
|
||||
val conf = new SparkConf()
|
||||
.set("spark.checkpoint.compress", "true")
|
||||
.set("spark.ui.enabled", "false")
|
||||
.set(UI_ENABLED.key, "false")
|
||||
sc = new SparkContext("local", "test", conf)
|
||||
sc.setCheckpointDir(checkpointDir.toString)
|
||||
val rdd = sc.makeRDD(1 to 20, numSlices = 1)
|
||||
|
|
|
@ -26,6 +26,7 @@ import java.util.Base64
|
|||
import org.apache.hadoop.security.UserGroupInformation
|
||||
|
||||
import org.apache.spark.internal.config._
|
||||
import org.apache.spark.internal.config.UI._
|
||||
import org.apache.spark.launcher.SparkLauncher
|
||||
import org.apache.spark.security.GroupMappingServiceProvider
|
||||
import org.apache.spark.util.{ResetSystemProperties, SparkConfWithEnv, Utils}
|
||||
|
@ -43,11 +44,11 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties {
|
|||
|
||||
test("set security with conf") {
|
||||
val conf = new SparkConf
|
||||
conf.set("spark.authenticate", "true")
|
||||
conf.set("spark.authenticate.secret", "good")
|
||||
conf.set("spark.ui.acls.enable", "true")
|
||||
conf.set("spark.ui.view.acls", "user1,user2")
|
||||
val securityManager = new SecurityManager(conf);
|
||||
conf.set(NETWORK_AUTH_ENABLED, true)
|
||||
conf.set(AUTH_SECRET, "good")
|
||||
conf.set(ACLS_ENABLE, true)
|
||||
conf.set(UI_VIEW_ACLS, Seq("user1", "user2"))
|
||||
val securityManager = new SecurityManager(conf)
|
||||
assert(securityManager.isAuthenticationEnabled() === true)
|
||||
assert(securityManager.aclsEnabled() === true)
|
||||
assert(securityManager.checkUIViewPermissions("user1") === true)
|
||||
|
@ -57,10 +58,10 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties {
|
|||
|
||||
test("set security with conf for groups") {
|
||||
val conf = new SparkConf
|
||||
conf.set("spark.authenticate", "true")
|
||||
conf.set("spark.authenticate.secret", "good")
|
||||
conf.set("spark.ui.acls.enable", "true")
|
||||
conf.set("spark.ui.view.acls.groups", "group1,group2")
|
||||
conf.set(NETWORK_AUTH_ENABLED, true)
|
||||
conf.set(AUTH_SECRET, "good")
|
||||
conf.set(ACLS_ENABLE, true)
|
||||
conf.set(UI_VIEW_ACLS_GROUPS, Seq("group1", "group2"))
|
||||
// default ShellBasedGroupsMappingProvider is used to resolve user groups
|
||||
val securityManager = new SecurityManager(conf);
|
||||
// assuming executing user does not belong to group1,group2
|
||||
|
@ -68,27 +69,27 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties {
|
|||
assert(securityManager.checkUIViewPermissions("user2") === false)
|
||||
|
||||
val conf2 = new SparkConf
|
||||
conf2.set("spark.authenticate", "true")
|
||||
conf2.set("spark.authenticate.secret", "good")
|
||||
conf2.set("spark.ui.acls.enable", "true")
|
||||
conf2.set("spark.ui.view.acls.groups", "group1,group2")
|
||||
conf2.set(NETWORK_AUTH_ENABLED, true)
|
||||
conf2.set(AUTH_SECRET, "good")
|
||||
conf2.set(ACLS_ENABLE, true)
|
||||
conf2.set(UI_VIEW_ACLS_GROUPS, Seq("group1", "group2"))
|
||||
// explicitly specify a custom GroupsMappingServiceProvider
|
||||
conf2.set("spark.user.groups.mapping", "org.apache.spark.DummyGroupMappingServiceProvider")
|
||||
conf2.set(USER_GROUPS_MAPPING, "org.apache.spark.DummyGroupMappingServiceProvider")
|
||||
|
||||
val securityManager2 = new SecurityManager(conf2);
|
||||
val securityManager2 = new SecurityManager(conf2)
|
||||
// group4,group5 do not match
|
||||
assert(securityManager2.checkUIViewPermissions("user1") === true)
|
||||
assert(securityManager2.checkUIViewPermissions("user2") === true)
|
||||
|
||||
val conf3 = new SparkConf
|
||||
conf3.set("spark.authenticate", "true")
|
||||
conf3.set("spark.authenticate.secret", "good")
|
||||
conf3.set("spark.ui.acls.enable", "true")
|
||||
conf3.set("spark.ui.view.acls.groups", "group4,group5")
|
||||
conf3.set(NETWORK_AUTH_ENABLED, true)
|
||||
conf3.set(AUTH_SECRET, "good")
|
||||
conf3.set(ACLS_ENABLE, true)
|
||||
conf3.set(UI_VIEW_ACLS_GROUPS, Seq("group4", "group5"))
|
||||
// explicitly specify a bogus GroupsMappingServiceProvider
|
||||
conf3.set("spark.user.groups.mapping", "BogusServiceProvider")
|
||||
conf3.set(USER_GROUPS_MAPPING, "BogusServiceProvider")
|
||||
|
||||
val securityManager3 = new SecurityManager(conf3);
|
||||
val securityManager3 = new SecurityManager(conf3)
|
||||
// BogusServiceProvider cannot be loaded and an error is logged returning an empty group set
|
||||
assert(securityManager3.checkUIViewPermissions("user1") === false)
|
||||
assert(securityManager3.checkUIViewPermissions("user2") === false)
|
||||
|
@ -96,7 +97,7 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties {
|
|||
|
||||
test("set security with api") {
|
||||
val conf = new SparkConf
|
||||
conf.set("spark.ui.view.acls", "user1,user2")
|
||||
conf.set(UI_VIEW_ACLS, Seq("user1", "user2"))
|
||||
val securityManager = new SecurityManager(conf);
|
||||
securityManager.setAcls(true)
|
||||
assert(securityManager.aclsEnabled() === true)
|
||||
|
@ -108,7 +109,7 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties {
|
|||
|
||||
securityManager.setAcls(true)
|
||||
assert(securityManager.aclsEnabled() === true)
|
||||
securityManager.setViewAcls(Set[String]("user5"), "user6,user7")
|
||||
securityManager.setViewAcls(Set[String]("user5"), Seq("user6", "user7"))
|
||||
assert(securityManager.checkUIViewPermissions("user1") === false)
|
||||
assert(securityManager.checkUIViewPermissions("user5") === true)
|
||||
assert(securityManager.checkUIViewPermissions("user6") === true)
|
||||
|
@ -119,41 +120,41 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties {
|
|||
|
||||
test("set security with api for groups") {
|
||||
val conf = new SparkConf
|
||||
conf.set("spark.user.groups.mapping", "org.apache.spark.DummyGroupMappingServiceProvider")
|
||||
conf.set(USER_GROUPS_MAPPING, "org.apache.spark.DummyGroupMappingServiceProvider")
|
||||
|
||||
val securityManager = new SecurityManager(conf);
|
||||
val securityManager = new SecurityManager(conf)
|
||||
securityManager.setAcls(true)
|
||||
securityManager.setViewAclsGroups("group1,group2")
|
||||
securityManager.setViewAclsGroups(Seq("group1", "group2"))
|
||||
|
||||
// group1,group2 match
|
||||
assert(securityManager.checkUIViewPermissions("user1") === true)
|
||||
assert(securityManager.checkUIViewPermissions("user2") === true)
|
||||
|
||||
// change groups so they do not match
|
||||
securityManager.setViewAclsGroups("group4,group5")
|
||||
securityManager.setViewAclsGroups(Seq("group4", "group5"))
|
||||
assert(securityManager.checkUIViewPermissions("user1") === false)
|
||||
assert(securityManager.checkUIViewPermissions("user2") === false)
|
||||
|
||||
val conf2 = new SparkConf
|
||||
conf.set("spark.user.groups.mapping", "BogusServiceProvider")
|
||||
conf.set(USER_GROUPS_MAPPING, "BogusServiceProvider")
|
||||
|
||||
val securityManager2 = new SecurityManager(conf2)
|
||||
securityManager2.setAcls(true)
|
||||
securityManager2.setViewAclsGroups("group1,group2")
|
||||
securityManager2.setViewAclsGroups(Seq("group1", "group2"))
|
||||
|
||||
// group1,group2 do not match because of BogusServiceProvider
|
||||
assert(securityManager.checkUIViewPermissions("user1") === false)
|
||||
assert(securityManager.checkUIViewPermissions("user2") === false)
|
||||
|
||||
// setting viewAclsGroups to empty should still not match because of BogusServiceProvider
|
||||
securityManager2.setViewAclsGroups("")
|
||||
securityManager2.setViewAclsGroups(Nil)
|
||||
assert(securityManager.checkUIViewPermissions("user1") === false)
|
||||
assert(securityManager.checkUIViewPermissions("user2") === false)
|
||||
}
|
||||
|
||||
test("set security modify acls") {
|
||||
val conf = new SparkConf
|
||||
conf.set("spark.modify.acls", "user1,user2")
|
||||
conf.set(MODIFY_ACLS, Seq("user1", "user2"))
|
||||
|
||||
val securityManager = new SecurityManager(conf);
|
||||
securityManager.setAcls(true)
|
||||
|
@ -166,7 +167,7 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties {
|
|||
|
||||
securityManager.setAcls(true)
|
||||
assert(securityManager.aclsEnabled() === true)
|
||||
securityManager.setModifyAcls(Set("user5"), "user6,user7")
|
||||
securityManager.setModifyAcls(Set("user5"), Seq("user6", "user7"))
|
||||
assert(securityManager.checkModifyPermissions("user1") === false)
|
||||
assert(securityManager.checkModifyPermissions("user5") === true)
|
||||
assert(securityManager.checkModifyPermissions("user6") === true)
|
||||
|
@ -177,34 +178,35 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties {
|
|||
|
||||
test("set security modify acls for groups") {
|
||||
val conf = new SparkConf
|
||||
conf.set("spark.user.groups.mapping", "org.apache.spark.DummyGroupMappingServiceProvider")
|
||||
conf.set(USER_GROUPS_MAPPING, "org.apache.spark.DummyGroupMappingServiceProvider")
|
||||
|
||||
val securityManager = new SecurityManager(conf);
|
||||
val securityManager = new SecurityManager(conf)
|
||||
securityManager.setAcls(true)
|
||||
securityManager.setModifyAclsGroups("group1,group2")
|
||||
securityManager.setModifyAclsGroups(Seq("group1", "group2"))
|
||||
|
||||
// group1,group2 match
|
||||
assert(securityManager.checkModifyPermissions("user1") === true)
|
||||
assert(securityManager.checkModifyPermissions("user2") === true)
|
||||
|
||||
// change groups so they do not match
|
||||
securityManager.setModifyAclsGroups("group4,group5")
|
||||
securityManager.setModifyAclsGroups(Seq("group4", "group5"))
|
||||
assert(securityManager.checkModifyPermissions("user1") === false)
|
||||
assert(securityManager.checkModifyPermissions("user2") === false)
|
||||
|
||||
// change so they match again
|
||||
securityManager.setModifyAclsGroups("group2,group3")
|
||||
securityManager.setModifyAclsGroups(Seq("group2", "group3"))
|
||||
|
||||
assert(securityManager.checkModifyPermissions("user1") === true)
|
||||
assert(securityManager.checkModifyPermissions("user2") === true)
|
||||
}
|
||||
|
||||
test("set security admin acls") {
|
||||
val conf = new SparkConf
|
||||
conf.set("spark.admin.acls", "user1,user2")
|
||||
conf.set("spark.ui.view.acls", "user3")
|
||||
conf.set("spark.modify.acls", "user4")
|
||||
conf.set(ADMIN_ACLS, Seq("user1", "user2"))
|
||||
conf.set(UI_VIEW_ACLS, Seq("user3"))
|
||||
conf.set(MODIFY_ACLS, Seq("user4"))
|
||||
|
||||
val securityManager = new SecurityManager(conf);
|
||||
val securityManager = new SecurityManager(conf)
|
||||
securityManager.setAcls(true)
|
||||
assert(securityManager.aclsEnabled() === true)
|
||||
|
||||
|
@ -221,9 +223,9 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties {
|
|||
assert(securityManager.checkUIViewPermissions("user5") === false)
|
||||
assert(securityManager.checkUIViewPermissions(null) === true)
|
||||
|
||||
securityManager.setAdminAcls("user6")
|
||||
securityManager.setViewAcls(Set[String]("user8"), "user9")
|
||||
securityManager.setModifyAcls(Set("user11"), "user9")
|
||||
securityManager.setAdminAcls(Seq("user6"))
|
||||
securityManager.setViewAcls(Set[String]("user8"), Seq("user9"))
|
||||
securityManager.setModifyAcls(Set("user11"), Seq("user9"))
|
||||
assert(securityManager.checkModifyPermissions("user6") === true)
|
||||
assert(securityManager.checkModifyPermissions("user11") === true)
|
||||
assert(securityManager.checkModifyPermissions("user9") === true)
|
||||
|
@ -240,12 +242,12 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties {
|
|||
|
||||
test("set security admin acls for groups") {
|
||||
val conf = new SparkConf
|
||||
conf.set("spark.admin.acls.groups", "group1")
|
||||
conf.set("spark.ui.view.acls.groups", "group2")
|
||||
conf.set("spark.modify.acls.groups", "group3")
|
||||
conf.set("spark.user.groups.mapping", "org.apache.spark.DummyGroupMappingServiceProvider")
|
||||
conf.set(ADMIN_ACLS_GROUPS, Seq("group1"))
|
||||
conf.set(UI_VIEW_ACLS_GROUPS, Seq("group2"))
|
||||
conf.set(MODIFY_ACLS_GROUPS, Seq("group3"))
|
||||
conf.set(USER_GROUPS_MAPPING, "org.apache.spark.DummyGroupMappingServiceProvider")
|
||||
|
||||
val securityManager = new SecurityManager(conf);
|
||||
val securityManager = new SecurityManager(conf)
|
||||
securityManager.setAcls(true)
|
||||
assert(securityManager.aclsEnabled() === true)
|
||||
|
||||
|
@ -254,38 +256,38 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties {
|
|||
assert(securityManager.checkUIViewPermissions("user1") === true)
|
||||
|
||||
// change admin groups so they do not match. view and modify groups are set to admin groups
|
||||
securityManager.setAdminAclsGroups("group4,group5")
|
||||
securityManager.setAdminAclsGroups(Seq("group4", "group5"))
|
||||
// invoke the set ui and modify to propagate the changes
|
||||
securityManager.setViewAclsGroups("")
|
||||
securityManager.setModifyAclsGroups("")
|
||||
securityManager.setViewAclsGroups(Nil)
|
||||
securityManager.setModifyAclsGroups(Nil)
|
||||
|
||||
assert(securityManager.checkModifyPermissions("user1") === false)
|
||||
assert(securityManager.checkUIViewPermissions("user1") === false)
|
||||
|
||||
// change modify groups so they match
|
||||
securityManager.setModifyAclsGroups("group3")
|
||||
securityManager.setModifyAclsGroups(Seq("group3"))
|
||||
assert(securityManager.checkModifyPermissions("user1") === true)
|
||||
assert(securityManager.checkUIViewPermissions("user1") === false)
|
||||
|
||||
// change view groups so they match
|
||||
securityManager.setViewAclsGroups("group2")
|
||||
securityManager.setModifyAclsGroups("group4")
|
||||
securityManager.setViewAclsGroups(Seq("group2"))
|
||||
securityManager.setModifyAclsGroups(Seq("group4"))
|
||||
assert(securityManager.checkModifyPermissions("user1") === false)
|
||||
assert(securityManager.checkUIViewPermissions("user1") === true)
|
||||
|
||||
// change modify and view groups so they do not match
|
||||
securityManager.setViewAclsGroups("group7")
|
||||
securityManager.setModifyAclsGroups("group8")
|
||||
securityManager.setViewAclsGroups(Seq("group7"))
|
||||
securityManager.setModifyAclsGroups(Seq("group8"))
|
||||
assert(securityManager.checkModifyPermissions("user1") === false)
|
||||
assert(securityManager.checkUIViewPermissions("user1") === false)
|
||||
}
|
||||
|
||||
test("set security with * in acls") {
|
||||
val conf = new SparkConf
|
||||
conf.set("spark.ui.acls.enable", "true")
|
||||
conf.set("spark.admin.acls", "user1,user2")
|
||||
conf.set("spark.ui.view.acls", "*")
|
||||
conf.set("spark.modify.acls", "user4")
|
||||
conf.set(ACLS_ENABLE.key, "true")
|
||||
conf.set(ADMIN_ACLS, Seq("user1", "user2"))
|
||||
conf.set(UI_VIEW_ACLS, Seq("*"))
|
||||
conf.set(MODIFY_ACLS, Seq("user4"))
|
||||
|
||||
val securityManager = new SecurityManager(conf)
|
||||
assert(securityManager.aclsEnabled() === true)
|
||||
|
@ -299,22 +301,22 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties {
|
|||
assert(securityManager.checkModifyPermissions("user8") === false)
|
||||
|
||||
// check for modifyAcls with *
|
||||
securityManager.setModifyAcls(Set("user4"), "*")
|
||||
securityManager.setModifyAcls(Set("user4"), Seq("*"))
|
||||
assert(securityManager.checkModifyPermissions("user7") === true)
|
||||
assert(securityManager.checkModifyPermissions("user8") === true)
|
||||
|
||||
securityManager.setAdminAcls("user1,user2")
|
||||
securityManager.setModifyAcls(Set("user1"), "user2")
|
||||
securityManager.setViewAcls(Set("user1"), "user2")
|
||||
securityManager.setAdminAcls(Seq("user1", "user2"))
|
||||
securityManager.setModifyAcls(Set("user1"), Seq("user2"))
|
||||
securityManager.setViewAcls(Set("user1"), Seq("user2"))
|
||||
assert(securityManager.checkUIViewPermissions("user5") === false)
|
||||
assert(securityManager.checkUIViewPermissions("user6") === false)
|
||||
assert(securityManager.checkModifyPermissions("user7") === false)
|
||||
assert(securityManager.checkModifyPermissions("user8") === false)
|
||||
|
||||
// check for adminAcls with *
|
||||
securityManager.setAdminAcls("user1,*")
|
||||
securityManager.setModifyAcls(Set("user1"), "user2")
|
||||
securityManager.setViewAcls(Set("user1"), "user2")
|
||||
securityManager.setAdminAcls(Seq("user1", "*"))
|
||||
securityManager.setModifyAcls(Set("user1"), Seq("user2"))
|
||||
securityManager.setViewAcls(Set("user1"), Seq("user2"))
|
||||
assert(securityManager.checkUIViewPermissions("user5") === true)
|
||||
assert(securityManager.checkUIViewPermissions("user6") === true)
|
||||
assert(securityManager.checkModifyPermissions("user7") === true)
|
||||
|
@ -323,10 +325,10 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties {
|
|||
|
||||
test("set security with * in acls for groups") {
|
||||
val conf = new SparkConf
|
||||
conf.set("spark.ui.acls.enable", "true")
|
||||
conf.set("spark.admin.acls.groups", "group4,group5")
|
||||
conf.set("spark.ui.view.acls.groups", "*")
|
||||
conf.set("spark.modify.acls.groups", "group6")
|
||||
conf.set(ACLS_ENABLE, true)
|
||||
conf.set(ADMIN_ACLS_GROUPS, Seq("group4", "group5"))
|
||||
conf.set(UI_VIEW_ACLS_GROUPS, Seq("*"))
|
||||
conf.set(MODIFY_ACLS_GROUPS, Seq("group6"))
|
||||
|
||||
val securityManager = new SecurityManager(conf)
|
||||
assert(securityManager.aclsEnabled() === true)
|
||||
|
@ -338,17 +340,17 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties {
|
|||
assert(securityManager.checkModifyPermissions("user2") === false)
|
||||
|
||||
// check for modifyAcls with *
|
||||
securityManager.setModifyAclsGroups("*")
|
||||
securityManager.setViewAclsGroups("group6")
|
||||
securityManager.setModifyAclsGroups(Seq("*"))
|
||||
securityManager.setViewAclsGroups(Seq("group6"))
|
||||
assert(securityManager.checkUIViewPermissions("user1") === false)
|
||||
assert(securityManager.checkUIViewPermissions("user2") === false)
|
||||
assert(securityManager.checkModifyPermissions("user1") === true)
|
||||
assert(securityManager.checkModifyPermissions("user2") === true)
|
||||
|
||||
// check for adminAcls with *
|
||||
securityManager.setAdminAclsGroups("group9,*")
|
||||
securityManager.setModifyAclsGroups("group4,group5")
|
||||
securityManager.setViewAclsGroups("group6,group7")
|
||||
securityManager.setAdminAclsGroups(Seq("group9", "*"))
|
||||
securityManager.setModifyAclsGroups(Seq("group4", "group5"))
|
||||
securityManager.setViewAclsGroups(Seq("group6", "group7"))
|
||||
assert(securityManager.checkUIViewPermissions("user5") === true)
|
||||
assert(securityManager.checkUIViewPermissions("user6") === true)
|
||||
assert(securityManager.checkModifyPermissions("user7") === true)
|
||||
|
@ -367,13 +369,13 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties {
|
|||
assert(securityManager.checkModifyPermissions("user1") === false)
|
||||
|
||||
// set groups only
|
||||
securityManager.setAdminAclsGroups("group1,group2")
|
||||
securityManager.setAdminAclsGroups(Seq("group1", "group2"))
|
||||
assert(securityManager.checkUIViewPermissions("user1") === false)
|
||||
assert(securityManager.checkModifyPermissions("user1") === false)
|
||||
}
|
||||
|
||||
test("missing secret authentication key") {
|
||||
val conf = new SparkConf().set("spark.authenticate", "true")
|
||||
val conf = new SparkConf().set(NETWORK_AUTH_ENABLED, true)
|
||||
val mgr = new SecurityManager(conf)
|
||||
intercept[IllegalArgumentException] {
|
||||
mgr.getSecretKey()
|
||||
|
|
|
@ -33,7 +33,8 @@ import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFor
|
|||
import org.scalatest.Matchers._
|
||||
import org.scalatest.concurrent.Eventually
|
||||
|
||||
import org.apache.spark.internal.config.EXECUTOR_HEARTBEAT_DROP_ZERO_ACCUMULATOR_UPDATES
|
||||
import org.apache.spark.internal.config._
|
||||
import org.apache.spark.internal.config.UI._
|
||||
import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorMetricsUpdate, SparkListenerJobStart, SparkListenerTaskEnd, SparkListenerTaskStart}
|
||||
import org.apache.spark.shuffle.FetchFailedException
|
||||
import org.apache.spark.util.{ThreadUtils, Utils}
|
||||
|
@ -665,7 +666,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
|
|||
val conf = new SparkConf()
|
||||
.setMaster("local-cluster[1,2,1024]")
|
||||
.setAppName("test-cluster")
|
||||
.set("spark.ui.enabled", "false")
|
||||
.set(UI_ENABLED.key, "false")
|
||||
// Disable this so that if a task is running, we can make sure the executor will always send
|
||||
// task metrics via heartbeat to driver.
|
||||
.set(EXECUTOR_HEARTBEAT_DROP_ZERO_ACCUMULATOR_UPDATES.key, "false")
|
||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.spark.deploy.SparkSubmit._
|
|||
import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.internal.config._
|
||||
import org.apache.spark.internal.config.UI._
|
||||
import org.apache.spark.launcher.SparkLauncher
|
||||
import org.apache.spark.scheduler.EventLoggingListener
|
||||
import org.apache.spark.util.{CommandLineUtils, ResetSystemProperties, Utils}
|
||||
|
@ -289,7 +290,7 @@ class SparkSubmitSuite
|
|||
conf.get("spark.yarn.dist.files") should include regex (".*file1.txt,.*file2.txt")
|
||||
conf.get("spark.yarn.dist.archives") should include regex (".*archive1.txt,.*archive2.txt")
|
||||
conf.get("spark.app.name") should be ("beauty")
|
||||
conf.get("spark.ui.enabled") should be ("false")
|
||||
conf.get(UI_ENABLED) should be (false)
|
||||
sys.props("SPARK_SUBMIT") should be ("true")
|
||||
}
|
||||
|
||||
|
@ -328,7 +329,7 @@ class SparkSubmitSuite
|
|||
conf.get("spark.yarn.dist.archives") should include regex (".*archive1.txt,.*archive2.txt")
|
||||
conf.get("spark.yarn.dist.jars") should include
|
||||
regex (".*one.jar,.*two.jar,.*three.jar,.*thejar.jar")
|
||||
conf.get("spark.ui.enabled") should be ("false")
|
||||
conf.get(UI_ENABLED) should be (false)
|
||||
sys.props("SPARK_SUBMIT") should be ("true")
|
||||
}
|
||||
|
||||
|
@ -377,9 +378,9 @@ class SparkSubmitSuite
|
|||
confMap.keys should contain ("spark.driver.memory")
|
||||
confMap.keys should contain ("spark.driver.cores")
|
||||
confMap.keys should contain ("spark.driver.supervise")
|
||||
confMap.keys should contain ("spark.ui.enabled")
|
||||
confMap.keys should contain (UI_ENABLED.key)
|
||||
confMap.keys should contain ("spark.submit.deployMode")
|
||||
conf.get("spark.ui.enabled") should be ("false")
|
||||
conf.get(UI_ENABLED) should be (false)
|
||||
}
|
||||
|
||||
test("handles standalone client mode") {
|
||||
|
@ -401,7 +402,7 @@ class SparkSubmitSuite
|
|||
classpath(0) should endWith ("thejar.jar")
|
||||
conf.get("spark.executor.memory") should be ("5g")
|
||||
conf.get("spark.cores.max") should be ("5")
|
||||
conf.get("spark.ui.enabled") should be ("false")
|
||||
conf.get(UI_ENABLED) should be (false)
|
||||
}
|
||||
|
||||
test("handles mesos client mode") {
|
||||
|
@ -423,7 +424,7 @@ class SparkSubmitSuite
|
|||
classpath(0) should endWith ("thejar.jar")
|
||||
conf.get("spark.executor.memory") should be ("5g")
|
||||
conf.get("spark.cores.max") should be ("5")
|
||||
conf.get("spark.ui.enabled") should be ("false")
|
||||
conf.get(UI_ENABLED) should be (false)
|
||||
}
|
||||
|
||||
test("handles k8s cluster mode") {
|
||||
|
|
|
@ -44,6 +44,7 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
|
|||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.internal.config.DRIVER_LOG_DFS_DIR
|
||||
import org.apache.spark.internal.config.History._
|
||||
import org.apache.spark.internal.config.UI.{ADMIN_ACLS, ADMIN_ACLS_GROUPS, USER_GROUPS_MAPPING}
|
||||
import org.apache.spark.io._
|
||||
import org.apache.spark.scheduler._
|
||||
import org.apache.spark.security.GroupMappingServiceProvider
|
||||
|
@ -644,12 +645,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
|
|||
|
||||
// Test both history ui admin acls and application acls are configured.
|
||||
val conf1 = createTestConf()
|
||||
.set(UI_ACLS_ENABLE, true)
|
||||
.set(UI_ADMIN_ACLS, "user1,user2")
|
||||
.set(UI_ADMIN_ACLS_GROUPS, "group1")
|
||||
.set("spark.user.groups.mapping", classOf[TestGroupsMappingProvider].getName)
|
||||
.set(HISTORY_SERVER_UI_ACLS_ENABLE, true)
|
||||
.set(HISTORY_SERVER_UI_ADMIN_ACLS, Seq("user1", "user2"))
|
||||
.set(HISTORY_SERVER_UI_ADMIN_ACLS_GROUPS, Seq("group1"))
|
||||
.set(USER_GROUPS_MAPPING, classOf[TestGroupsMappingProvider].getName)
|
||||
|
||||
createAndCheck(conf1, ("spark.admin.acls", "user"), ("spark.admin.acls.groups", "group")) {
|
||||
createAndCheck(conf1, (ADMIN_ACLS.key, "user"), (ADMIN_ACLS_GROUPS.key, "group")) {
|
||||
securityManager =>
|
||||
// Test whether user has permission to access UI.
|
||||
securityManager.checkUIViewPermissions("user1") should be (true)
|
||||
|
@ -666,10 +667,10 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
|
|||
|
||||
// Test only history ui admin acls are configured.
|
||||
val conf2 = createTestConf()
|
||||
.set(UI_ACLS_ENABLE, true)
|
||||
.set(UI_ADMIN_ACLS, "user1,user2")
|
||||
.set(UI_ADMIN_ACLS_GROUPS, "group1")
|
||||
.set("spark.user.groups.mapping", classOf[TestGroupsMappingProvider].getName)
|
||||
.set(HISTORY_SERVER_UI_ACLS_ENABLE, true)
|
||||
.set(HISTORY_SERVER_UI_ADMIN_ACLS, Seq("user1", "user2"))
|
||||
.set(HISTORY_SERVER_UI_ADMIN_ACLS_GROUPS, Seq("group1"))
|
||||
.set(USER_GROUPS_MAPPING, classOf[TestGroupsMappingProvider].getName)
|
||||
createAndCheck(conf2) { securityManager =>
|
||||
// Test whether user has permission to access UI.
|
||||
securityManager.checkUIViewPermissions("user1") should be (true)
|
||||
|
@ -686,8 +687,8 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
|
|||
|
||||
// Test neither history ui admin acls nor application acls are configured.
|
||||
val conf3 = createTestConf()
|
||||
.set(UI_ACLS_ENABLE, true)
|
||||
.set("spark.user.groups.mapping", classOf[TestGroupsMappingProvider].getName)
|
||||
.set(HISTORY_SERVER_UI_ACLS_ENABLE, true)
|
||||
.set(USER_GROUPS_MAPPING, classOf[TestGroupsMappingProvider].getName)
|
||||
createAndCheck(conf3) { securityManager =>
|
||||
// Test whether user has permission to access UI.
|
||||
securityManager.checkUIViewPermissions("user1") should be (false)
|
||||
|
|
|
@ -48,6 +48,7 @@ import org.apache.spark._
|
|||
import org.apache.spark.internal.config._
|
||||
import org.apache.spark.internal.config.History._
|
||||
import org.apache.spark.internal.config.Tests.IS_TESTING
|
||||
import org.apache.spark.internal.config.UI._
|
||||
import org.apache.spark.status.api.v1.ApplicationInfo
|
||||
import org.apache.spark.status.api.v1.JobData
|
||||
import org.apache.spark.ui.SparkUI
|
||||
|
@ -613,9 +614,9 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
|
|||
|
||||
stop()
|
||||
init(
|
||||
"spark.ui.filters" -> classOf[FakeAuthFilter].getName(),
|
||||
UI_ACLS_ENABLE.key -> "true",
|
||||
UI_ADMIN_ACLS.key -> admin)
|
||||
UI_FILTERS.key -> classOf[FakeAuthFilter].getName(),
|
||||
HISTORY_SERVER_UI_ACLS_ENABLE.key -> "true",
|
||||
HISTORY_SERVER_UI_ADMIN_ACLS.key -> admin)
|
||||
|
||||
val tests = Seq(
|
||||
(owner, HttpServletResponse.SC_OK),
|
||||
|
|
|
@ -39,6 +39,8 @@ import other.supplier.{CustomPersistenceEngine, CustomRecoveryModeFactory}
|
|||
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
|
||||
import org.apache.spark.deploy._
|
||||
import org.apache.spark.deploy.DeployMessages._
|
||||
import org.apache.spark.internal.config._
|
||||
import org.apache.spark.internal.config.UI._
|
||||
import org.apache.spark.rpc.{RpcAddress, RpcEndpoint, RpcEndpointRef, RpcEnv}
|
||||
import org.apache.spark.serializer
|
||||
|
||||
|
@ -104,7 +106,7 @@ class MasterSuite extends SparkFunSuite
|
|||
conf.set("spark.deploy.recoveryMode", "CUSTOM")
|
||||
conf.set("spark.deploy.recoveryMode.factory",
|
||||
classOf[CustomRecoveryModeFactory].getCanonicalName)
|
||||
conf.set("spark.master.rest.enabled", "false")
|
||||
conf.set(MASTER_REST_SERVER_ENABLED, false)
|
||||
|
||||
val instantiationAttempts = CustomRecoveryModeFactory.instantiationAttempts
|
||||
|
||||
|
@ -189,7 +191,7 @@ class MasterSuite extends SparkFunSuite
|
|||
conf.set("spark.deploy.recoveryMode", "CUSTOM")
|
||||
conf.set("spark.deploy.recoveryMode.factory",
|
||||
classOf[FakeRecoveryModeFactory].getCanonicalName)
|
||||
conf.set("spark.master.rest.enabled", "false")
|
||||
conf.set(MASTER_REST_SERVER_ENABLED, false)
|
||||
|
||||
val fakeAppInfo = makeAppInfo(1024)
|
||||
val fakeWorkerInfo = makeWorkerInfo(8192, 16)
|
||||
|
@ -286,8 +288,8 @@ class MasterSuite extends SparkFunSuite
|
|||
implicit val formats = org.json4s.DefaultFormats
|
||||
val reverseProxyUrl = "http://localhost:8080"
|
||||
val conf = new SparkConf()
|
||||
conf.set("spark.ui.reverseProxy", "true")
|
||||
conf.set("spark.ui.reverseProxyUrl", reverseProxyUrl)
|
||||
conf.set(UI_REVERSE_PROXY, true)
|
||||
conf.set(UI_REVERSE_PROXY_URL, reverseProxyUrl)
|
||||
val localCluster = new LocalSparkCluster(2, 2, 512, conf)
|
||||
localCluster.start()
|
||||
try {
|
||||
|
|
|
@ -41,6 +41,7 @@ import org.scalatest.mockito.MockitoSugar
|
|||
import org.apache.spark._
|
||||
import org.apache.spark.TaskState.TaskState
|
||||
import org.apache.spark.internal.config._
|
||||
import org.apache.spark.internal.config.UI._
|
||||
import org.apache.spark.memory.TestMemoryManager
|
||||
import org.apache.spark.metrics.MetricsSystem
|
||||
import org.apache.spark.rdd.RDD
|
||||
|
@ -169,7 +170,7 @@ class ExecutorSuite extends SparkFunSuite
|
|||
val conf = new SparkConf()
|
||||
.setMaster("local")
|
||||
.setAppName("executor thread test")
|
||||
.set("spark.ui.enabled", "false")
|
||||
.set(UI_ENABLED.key, "false")
|
||||
sc = new SparkContext(conf)
|
||||
val executorThread = sc.parallelize(Seq(1), 1).map { _ =>
|
||||
Thread.currentThread.getClass.getName
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.scalatest.Matchers
|
|||
import org.scalatest.concurrent.Eventually._
|
||||
|
||||
import org.apache.spark._
|
||||
import org.apache.spark.internal.config.UI.UI_ENABLED
|
||||
import org.apache.spark.util.Utils
|
||||
|
||||
class LauncherBackendSuite extends SparkFunSuite with Matchers {
|
||||
|
@ -48,7 +49,7 @@ class LauncherBackendSuite extends SparkFunSuite with Matchers {
|
|||
val handle = new SparkLauncher(env)
|
||||
.setSparkHome(sys.props("spark.test.home"))
|
||||
.setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, System.getProperty("java.class.path"))
|
||||
.setConf("spark.ui.enabled", "false")
|
||||
.setConf(UI_ENABLED.key, "false")
|
||||
.setConf(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, s"-Dtest.appender=console")
|
||||
.setMaster(master)
|
||||
.setAppResource(SparkLauncher.NO_RESOURCE)
|
||||
|
|
|
@ -21,6 +21,7 @@ import org.scalatest.BeforeAndAfter
|
|||
|
||||
import org.apache.spark.SparkConf
|
||||
import org.apache.spark.SparkFunSuite
|
||||
import org.apache.spark.internal.config.METRICS_CONF
|
||||
|
||||
class MetricsConfigSuite extends SparkFunSuite with BeforeAndAfter {
|
||||
var filePath: String = _
|
||||
|
@ -31,7 +32,7 @@ class MetricsConfigSuite extends SparkFunSuite with BeforeAndAfter {
|
|||
|
||||
test("MetricsConfig with default properties") {
|
||||
val sparkConf = new SparkConf(loadDefaults = false)
|
||||
sparkConf.set("spark.metrics.conf", "dummy-file")
|
||||
sparkConf.set(METRICS_CONF, "dummy-file")
|
||||
val conf = new MetricsConfig(sparkConf)
|
||||
conf.initialize()
|
||||
|
||||
|
@ -47,7 +48,7 @@ class MetricsConfigSuite extends SparkFunSuite with BeforeAndAfter {
|
|||
|
||||
test("MetricsConfig with properties set from a file") {
|
||||
val sparkConf = new SparkConf(loadDefaults = false)
|
||||
sparkConf.set("spark.metrics.conf", filePath)
|
||||
sparkConf.set(METRICS_CONF, filePath)
|
||||
val conf = new MetricsConfig(sparkConf)
|
||||
conf.initialize()
|
||||
|
||||
|
@ -110,7 +111,7 @@ class MetricsConfigSuite extends SparkFunSuite with BeforeAndAfter {
|
|||
setMetricsProperty(sparkConf, "*.source.jvm.class", "org.apache.spark.SomeOtherSource")
|
||||
setMetricsProperty(sparkConf, "master.sink.console.period", "50")
|
||||
setMetricsProperty(sparkConf, "master.sink.console.unit", "seconds")
|
||||
sparkConf.set("spark.metrics.conf", filePath)
|
||||
sparkConf.set(METRICS_CONF, filePath)
|
||||
val conf = new MetricsConfig(sparkConf)
|
||||
conf.initialize()
|
||||
|
||||
|
@ -135,7 +136,7 @@ class MetricsConfigSuite extends SparkFunSuite with BeforeAndAfter {
|
|||
|
||||
test("MetricsConfig with subProperties") {
|
||||
val sparkConf = new SparkConf(loadDefaults = false)
|
||||
sparkConf.set("spark.metrics.conf", filePath)
|
||||
sparkConf.set(METRICS_CONF, filePath)
|
||||
val conf = new MetricsConfig(sparkConf)
|
||||
conf.initialize()
|
||||
|
||||
|
|
|
@ -35,7 +35,7 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM
|
|||
|
||||
before {
|
||||
filePath = getClass.getClassLoader.getResource("test_metrics_system.properties").getFile
|
||||
conf = new SparkConf(false).set("spark.metrics.conf", filePath)
|
||||
conf = new SparkConf(false).set(METRICS_CONF, filePath)
|
||||
securityMgr = new SecurityManager(conf)
|
||||
}
|
||||
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.scalatest.Matchers
|
|||
import org.scalatest.mockito.MockitoSugar
|
||||
|
||||
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
|
||||
import org.apache.spark.internal.config._
|
||||
import org.apache.spark.network.{BlockDataManager, BlockTransferService}
|
||||
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
|
||||
import org.apache.spark.network.shuffle.BlockFetchingListener
|
||||
|
@ -50,8 +51,8 @@ class NettyBlockTransferSecuritySuite extends SparkFunSuite with MockitoSugar wi
|
|||
|
||||
test("security on same password") {
|
||||
val conf = new SparkConf()
|
||||
.set("spark.authenticate", "true")
|
||||
.set("spark.authenticate.secret", "good")
|
||||
.set(NETWORK_AUTH_ENABLED, true)
|
||||
.set(AUTH_SECRET, "good")
|
||||
.set("spark.app.id", "app-id")
|
||||
testConnection(conf, conf) match {
|
||||
case Success(_) => // expected
|
||||
|
@ -61,10 +62,10 @@ class NettyBlockTransferSecuritySuite extends SparkFunSuite with MockitoSugar wi
|
|||
|
||||
test("security on mismatch password") {
|
||||
val conf0 = new SparkConf()
|
||||
.set("spark.authenticate", "true")
|
||||
.set("spark.authenticate.secret", "good")
|
||||
.set(NETWORK_AUTH_ENABLED, true)
|
||||
.set(AUTH_SECRET, "good")
|
||||
.set("spark.app.id", "app-id")
|
||||
val conf1 = conf0.clone.set("spark.authenticate.secret", "bad")
|
||||
val conf1 = conf0.clone.set(AUTH_SECRET, "bad")
|
||||
testConnection(conf0, conf1) match {
|
||||
case Success(_) => fail("Should have failed")
|
||||
case Failure(t) => t.getMessage should include ("Mismatched response")
|
||||
|
@ -73,10 +74,10 @@ class NettyBlockTransferSecuritySuite extends SparkFunSuite with MockitoSugar wi
|
|||
|
||||
test("security mismatch auth off on server") {
|
||||
val conf0 = new SparkConf()
|
||||
.set("spark.authenticate", "true")
|
||||
.set("spark.authenticate.secret", "good")
|
||||
.set(NETWORK_AUTH_ENABLED, true)
|
||||
.set(AUTH_SECRET, "good")
|
||||
.set("spark.app.id", "app-id")
|
||||
val conf1 = conf0.clone.set("spark.authenticate", "false")
|
||||
val conf1 = conf0.clone.set(NETWORK_AUTH_ENABLED, false)
|
||||
testConnection(conf0, conf1) match {
|
||||
case Success(_) => fail("Should have failed")
|
||||
case Failure(t) => // any funny error may occur, sever will interpret SASL token as RPC
|
||||
|
@ -85,10 +86,10 @@ class NettyBlockTransferSecuritySuite extends SparkFunSuite with MockitoSugar wi
|
|||
|
||||
test("security mismatch auth off on client") {
|
||||
val conf0 = new SparkConf()
|
||||
.set("spark.authenticate", "false")
|
||||
.set("spark.authenticate.secret", "good")
|
||||
.set(NETWORK_AUTH_ENABLED, false)
|
||||
.set(AUTH_SECRET, "good")
|
||||
.set("spark.app.id", "app-id")
|
||||
val conf1 = conf0.clone.set("spark.authenticate", "true")
|
||||
val conf1 = conf0.clone.set(NETWORK_AUTH_ENABLED, true)
|
||||
testConnection(conf0, conf1) match {
|
||||
case Success(_) => fail("Should have failed")
|
||||
case Failure(t) => t.getMessage should include ("Expected SaslMessage")
|
||||
|
@ -97,8 +98,8 @@ class NettyBlockTransferSecuritySuite extends SparkFunSuite with MockitoSugar wi
|
|||
|
||||
test("security with aes encryption") {
|
||||
val conf = new SparkConf()
|
||||
.set("spark.authenticate", "true")
|
||||
.set("spark.authenticate.secret", "good")
|
||||
.set(NETWORK_AUTH_ENABLED, true)
|
||||
.set(AUTH_SECRET, "good")
|
||||
.set("spark.app.id", "app-id")
|
||||
.set("spark.network.crypto.enabled", "true")
|
||||
.set("spark.network.crypto.saslFallback", "false")
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.scalatest.concurrent.Eventually._
|
|||
|
||||
import org.apache.spark.{SecurityManager, SparkConf, SparkEnv, SparkException, SparkFunSuite}
|
||||
import org.apache.spark.deploy.SparkHadoopUtil
|
||||
import org.apache.spark.internal.config._
|
||||
import org.apache.spark.util.{ThreadUtils, Utils}
|
||||
|
||||
/**
|
||||
|
@ -693,42 +694,42 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
|
|||
|
||||
test("send with authentication") {
|
||||
testSend(new SparkConf()
|
||||
.set("spark.authenticate", "true")
|
||||
.set("spark.authenticate.secret", "good"))
|
||||
.set(NETWORK_AUTH_ENABLED, true)
|
||||
.set(AUTH_SECRET, "good"))
|
||||
}
|
||||
|
||||
test("send with SASL encryption") {
|
||||
testSend(new SparkConf()
|
||||
.set("spark.authenticate", "true")
|
||||
.set("spark.authenticate.secret", "good")
|
||||
.set("spark.authenticate.enableSaslEncryption", "true"))
|
||||
.set(NETWORK_AUTH_ENABLED, true)
|
||||
.set(AUTH_SECRET, "good")
|
||||
.set(SASL_ENCRYPTION_ENABLED, true))
|
||||
}
|
||||
|
||||
test("send with AES encryption") {
|
||||
testSend(new SparkConf()
|
||||
.set("spark.authenticate", "true")
|
||||
.set("spark.authenticate.secret", "good")
|
||||
.set(NETWORK_AUTH_ENABLED, true)
|
||||
.set(AUTH_SECRET, "good")
|
||||
.set("spark.network.crypto.enabled", "true")
|
||||
.set("spark.network.crypto.saslFallback", "false"))
|
||||
}
|
||||
|
||||
test("ask with authentication") {
|
||||
testAsk(new SparkConf()
|
||||
.set("spark.authenticate", "true")
|
||||
.set("spark.authenticate.secret", "good"))
|
||||
.set(NETWORK_AUTH_ENABLED, true)
|
||||
.set(AUTH_SECRET, "good"))
|
||||
}
|
||||
|
||||
test("ask with SASL encryption") {
|
||||
testAsk(new SparkConf()
|
||||
.set("spark.authenticate", "true")
|
||||
.set("spark.authenticate.secret", "good")
|
||||
.set("spark.authenticate.enableSaslEncryption", "true"))
|
||||
.set(NETWORK_AUTH_ENABLED, true)
|
||||
.set(AUTH_SECRET, "good")
|
||||
.set(SASL_ENCRYPTION_ENABLED, true))
|
||||
}
|
||||
|
||||
test("ask with AES encryption") {
|
||||
testAsk(new SparkConf()
|
||||
.set("spark.authenticate", "true")
|
||||
.set("spark.authenticate.secret", "good")
|
||||
.set(NETWORK_AUTH_ENABLED, true)
|
||||
.set(AUTH_SECRET, "good")
|
||||
.set("spark.network.crypto.enabled", "true")
|
||||
.set("spark.network.crypto.saslFallback", "false"))
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.scalatest.BeforeAndAfter
|
|||
|
||||
import org.apache.spark._
|
||||
import org.apache.spark.executor.{Executor, TaskMetrics, TaskMetricsSuite}
|
||||
import org.apache.spark.internal.config.METRICS_CONF
|
||||
import org.apache.spark.memory.TaskMemoryManager
|
||||
import org.apache.spark.metrics.source.JvmSource
|
||||
import org.apache.spark.network.util.JavaUtils
|
||||
|
@ -37,7 +38,7 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
|
|||
test("provide metrics sources") {
|
||||
val filePath = getClass.getClassLoader.getResource("test_metrics_config.properties").getFile
|
||||
val conf = new SparkConf(loadDefaults = false)
|
||||
.set("spark.metrics.conf", filePath)
|
||||
.set(METRICS_CONF, filePath)
|
||||
sc = new SparkContext("local", "test", conf)
|
||||
val rdd = sc.makeRDD(1 to 1)
|
||||
val result = sc.runJob(rdd, (tc: TaskContext, it: Iterator[Int]) => {
|
||||
|
|
|
@ -31,7 +31,7 @@ import org.scalatest.concurrent.Eventually._
|
|||
import org.apache.spark._
|
||||
import org.apache.spark.broadcast.BroadcastManager
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.internal.config.{DRIVER_PORT, MEMORY_OFFHEAP_SIZE}
|
||||
import org.apache.spark.internal.config._
|
||||
import org.apache.spark.internal.config.Tests._
|
||||
import org.apache.spark.memory.UnifiedMemoryManager
|
||||
import org.apache.spark.network.BlockTransferService
|
||||
|
@ -86,7 +86,7 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
|
|||
before {
|
||||
rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr)
|
||||
|
||||
conf.set("spark.authenticate", "false")
|
||||
conf.set(NETWORK_AUTH_ENABLED, false)
|
||||
conf.set(DRIVER_PORT, rpcEnv.address.port)
|
||||
conf.set(IS_TESTING, true)
|
||||
conf.set("spark.memory.fraction", "1")
|
||||
|
|
|
@ -28,7 +28,7 @@ import org.mockito.ArgumentMatchers.{any, eq => meq}
|
|||
import org.mockito.Mockito.{mock, times, verify, when}
|
||||
|
||||
import org.apache.spark._
|
||||
import org.apache.spark.internal.config._
|
||||
import org.apache.spark.internal.config.UI._
|
||||
|
||||
class HttpSecurityFilterSuite extends SparkFunSuite {
|
||||
|
||||
|
@ -92,9 +92,9 @@ class HttpSecurityFilterSuite extends SparkFunSuite {
|
|||
|
||||
test("perform access control") {
|
||||
val conf = new SparkConf(false)
|
||||
.set("spark.ui.acls.enable", "true")
|
||||
.set("spark.admin.acls", "admin")
|
||||
.set("spark.ui.view.acls", "alice")
|
||||
.set(ACLS_ENABLE, true)
|
||||
.set(ADMIN_ACLS, Seq("admin"))
|
||||
.set(UI_VIEW_ACLS, Seq("alice"))
|
||||
val secMgr = new SecurityManager(conf)
|
||||
|
||||
val req = mockEmptyRequest()
|
||||
|
@ -123,7 +123,7 @@ class HttpSecurityFilterSuite extends SparkFunSuite {
|
|||
|
||||
test("set security-related headers") {
|
||||
val conf = new SparkConf(false)
|
||||
.set("spark.ui.allowFramingFrom", "example.com")
|
||||
.set(UI_ALLOW_FRAMING_FROM, "example.com")
|
||||
.set(UI_X_XSS_PROTECTION, "xssProtection")
|
||||
.set(UI_X_CONTENT_TYPE_OPTIONS, true)
|
||||
.set(UI_STRICT_TRANSPORT_SECURITY, "tsec")
|
||||
|
|
|
@ -39,8 +39,9 @@ import org.apache.spark._
|
|||
import org.apache.spark.LocalSparkContext._
|
||||
import org.apache.spark.api.java.StorageLevels
|
||||
import org.apache.spark.deploy.history.HistoryServerSuite
|
||||
import org.apache.spark.internal.config.MEMORY_OFFHEAP_SIZE
|
||||
import org.apache.spark.internal.config._
|
||||
import org.apache.spark.internal.config.Status._
|
||||
import org.apache.spark.internal.config.UI._
|
||||
import org.apache.spark.shuffle.FetchFailedException
|
||||
import org.apache.spark.status.api.v1.{JacksonMessageWriter, RDDDataDistribution, StageStatus}
|
||||
|
||||
|
@ -103,9 +104,9 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
|
|||
val conf = new SparkConf()
|
||||
.setMaster("local")
|
||||
.setAppName("test")
|
||||
.set("spark.ui.enabled", "true")
|
||||
.set("spark.ui.port", "0")
|
||||
.set("spark.ui.killEnabled", killEnabled.toString)
|
||||
.set(UI_ENABLED, true)
|
||||
.set(UI_PORT, 0)
|
||||
.set(UI_KILL_ENABLED, killEnabled)
|
||||
.set(MEMORY_OFFHEAP_SIZE.key, "64m")
|
||||
val sc = new SparkContext(conf)
|
||||
assert(sc.ui.isDefined)
|
||||
|
@ -531,8 +532,8 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
|
|||
val conf = new SparkConf()
|
||||
.setMaster("local")
|
||||
.setAppName("test")
|
||||
.set("spark.ui.enabled", "true")
|
||||
.set("spark.ui.port", "0")
|
||||
.set(UI_ENABLED, true)
|
||||
.set(UI_PORT, 0)
|
||||
.set(MAX_RETAINED_STAGES, 3)
|
||||
.set(MAX_RETAINED_JOBS, 2)
|
||||
.set(ASYNC_TRACKING_ENABLED, false)
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.scalatest.time.SpanSugar._
|
|||
|
||||
import org.apache.spark._
|
||||
import org.apache.spark.LocalSparkContext._
|
||||
import org.apache.spark.internal.config.UI.UI_ENABLED
|
||||
import org.apache.spark.util.Utils
|
||||
|
||||
class UISuite extends SparkFunSuite {
|
||||
|
@ -44,7 +45,7 @@ class UISuite extends SparkFunSuite {
|
|||
val conf = new SparkConf()
|
||||
.setMaster("local")
|
||||
.setAppName("test")
|
||||
.set("spark.ui.enabled", "true")
|
||||
.set(UI_ENABLED, true)
|
||||
val sc = new SparkContext(conf)
|
||||
assert(sc.ui.isDefined)
|
||||
sc
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.spark.deploy.k8s.Config._
|
|||
import org.apache.spark.deploy.k8s.Constants._
|
||||
import org.apache.spark.deploy.k8s.submit._
|
||||
import org.apache.spark.internal.config._
|
||||
import org.apache.spark.internal.config.UI._
|
||||
import org.apache.spark.ui.SparkUI
|
||||
import org.apache.spark.util.Utils
|
||||
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.spark.deploy.k8s.Config._
|
|||
import org.apache.spark.deploy.k8s.Constants._
|
||||
import org.apache.spark.deploy.k8s.submit._
|
||||
import org.apache.spark.internal.config._
|
||||
import org.apache.spark.internal.config.UI._
|
||||
import org.apache.spark.ui.SparkUI
|
||||
import org.apache.spark.util.Utils
|
||||
|
||||
|
@ -69,7 +70,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
|
|||
val expectedPortNames = Set(
|
||||
containerPort(DRIVER_PORT_NAME, DEFAULT_DRIVER_PORT),
|
||||
containerPort(BLOCK_MANAGER_PORT_NAME, DEFAULT_BLOCKMANAGER_PORT),
|
||||
containerPort(UI_PORT_NAME, SparkUI.DEFAULT_PORT)
|
||||
containerPort(UI_PORT_NAME, UI_PORT.defaultValue.get)
|
||||
)
|
||||
val foundPortNames = configuredPod.container.getPorts.asScala.toSet
|
||||
assert(expectedPortNames === foundPortNames)
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.scalatest.concurrent.Eventually
|
|||
import org.apache.spark.deploy.k8s.integrationtest.TestConstants._
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.internal.config.Tests.IS_TESTING
|
||||
import org.apache.spark.internal.config.UI.UI_ENABLED
|
||||
|
||||
private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesClient) {
|
||||
|
||||
|
@ -67,8 +68,8 @@ private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesCl
|
|||
.set("spark.executor.cores", "1")
|
||||
.set("spark.executors.instances", "1")
|
||||
.set("spark.app.name", "spark-test-app")
|
||||
.set("spark.ui.enabled", "true")
|
||||
.set(IS_TESTING.key, "false")
|
||||
.set(IS_TESTING, false)
|
||||
.set(UI_ENABLED, true)
|
||||
.set("spark.kubernetes.submission.waitAppCompletion", "false")
|
||||
.set("spark.kubernetes.authenticate.driver.serviceAccountName", serviceAccountName)
|
||||
}
|
||||
|
|
|
@ -44,6 +44,7 @@ import org.apache.spark.deploy.history.HistoryServer
|
|||
import org.apache.spark.deploy.yarn.config._
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.internal.config._
|
||||
import org.apache.spark.internal.config.UI._
|
||||
import org.apache.spark.metrics.MetricsSystem
|
||||
import org.apache.spark.rpc._
|
||||
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, YarnSchedulerBackend}
|
||||
|
@ -224,7 +225,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
|
|||
val attemptID = if (isClusterMode) {
|
||||
// Set the web ui port to be ephemeral for yarn so we don't conflict with
|
||||
// other spark processes running on the same box
|
||||
System.setProperty("spark.ui.port", "0")
|
||||
System.setProperty(UI_PORT.key, "0")
|
||||
|
||||
// Set the master and deploy mode property to match the requested mode.
|
||||
System.setProperty("spark.master", "yarn")
|
||||
|
@ -620,7 +621,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
|
|||
d.send(AddWebUIFilter(amFilter, params.toMap, proxyBase))
|
||||
|
||||
case None =>
|
||||
System.setProperty("spark.ui.filters", amFilter)
|
||||
System.setProperty(UI_FILTERS.key, amFilter)
|
||||
params.foreach { case (k, v) => System.setProperty(s"spark.$amFilter.param.$k", v) }
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.spark.SparkContext
|
|||
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
|
||||
import org.apache.spark.deploy.yarn.security.YARNHadoopDelegationTokenManager
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.internal.config.UI._
|
||||
import org.apache.spark.rpc._
|
||||
import org.apache.spark.scheduler._
|
||||
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
|
||||
|
@ -175,7 +176,7 @@ private[spark] abstract class YarnSchedulerBackend(
|
|||
filterParams != null && filterParams.nonEmpty
|
||||
if (hasFilter) {
|
||||
// SPARK-26255: Append user provided filters(spark.ui.filters) with yarn filter.
|
||||
val allFilters = filterName + "," + conf.get("spark.ui.filters", "")
|
||||
val allFilters = Seq(filterName) ++ conf.get(UI_FILTERS)
|
||||
logInfo(s"Add WebUI Filter. $filterName, $filterParams, $proxyBase")
|
||||
|
||||
// For already installed handlers, prepend the filter.
|
||||
|
@ -186,7 +187,7 @@ private[spark] abstract class YarnSchedulerBackend(
|
|||
filterParams.foreach { case (k, v) =>
|
||||
conf.set(s"spark.$filterName.param.$k", v)
|
||||
}
|
||||
conf.set("spark.ui.filters", allFilters)
|
||||
conf.set(UI_FILTERS, allFilters)
|
||||
|
||||
ui.getHandlers.map(_.getServletHandler()).foreach { h =>
|
||||
val holder = new FilterHolder()
|
||||
|
|
|
@ -36,6 +36,8 @@ import org.apache.spark._
|
|||
import org.apache.spark.deploy.SparkHadoopUtil
|
||||
import org.apache.spark.deploy.yarn.config._
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.internal.config._
|
||||
import org.apache.spark.internal.config.UI._
|
||||
import org.apache.spark.launcher._
|
||||
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationStart,
|
||||
SparkListenerExecutorAdded}
|
||||
|
@ -192,7 +194,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
|
|||
val propsFile = createConfFile()
|
||||
val handle = new SparkLauncher(env)
|
||||
.setSparkHome(sys.props("spark.test.home"))
|
||||
.setConf("spark.ui.enabled", "false")
|
||||
.setConf(UI_ENABLED.key, "false")
|
||||
.setPropertiesFile(propsFile)
|
||||
.setMaster("yarn")
|
||||
.setDeployMode("client")
|
||||
|
|
|
@ -30,6 +30,8 @@ import org.scalatest.Matchers
|
|||
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
|
||||
import org.apache.spark.deploy.SparkHadoopUtil
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.internal.config._
|
||||
import org.apache.spark.internal.config.UI._
|
||||
import org.apache.spark.util.{ResetSystemProperties, Utils}
|
||||
|
||||
class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging
|
||||
|
@ -83,7 +85,7 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging
|
|||
|
||||
// spark acls on, just pick up default user
|
||||
val sparkConf = new SparkConf()
|
||||
sparkConf.set("spark.acls.enable", "true")
|
||||
sparkConf.set(ACLS_ENABLE, true)
|
||||
|
||||
val securityMgr = new SecurityManager(sparkConf)
|
||||
val acls = YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr)
|
||||
|
@ -111,9 +113,9 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging
|
|||
|
||||
// default spark acls are on and specify acls
|
||||
val sparkConf = new SparkConf()
|
||||
sparkConf.set("spark.acls.enable", "true")
|
||||
sparkConf.set("spark.ui.view.acls", "user1,user2")
|
||||
sparkConf.set("spark.modify.acls", "user3,user4")
|
||||
sparkConf.set(ACLS_ENABLE, true)
|
||||
sparkConf.set(UI_VIEW_ACLS, Seq("user1", "user2"))
|
||||
sparkConf.set(MODIFY_ACLS, Seq("user3", "user4"))
|
||||
|
||||
val securityMgr = new SecurityManager(sparkConf)
|
||||
val acls = YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr)
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.spark.sql
|
|||
import org.scalatest.BeforeAndAfterEach
|
||||
|
||||
import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
|
||||
import org.apache.spark.internal.config.UI.UI_ENABLED
|
||||
import org.apache.spark.sql.internal.SQLConf
|
||||
|
||||
/**
|
||||
|
@ -38,7 +39,7 @@ class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach {
|
|||
test("create with config options and propagate them to SparkContext and SparkSession") {
|
||||
val session = SparkSession.builder()
|
||||
.master("local")
|
||||
.config("spark.ui.enabled", value = false)
|
||||
.config(UI_ENABLED.key, value = false)
|
||||
.config("some-config", "v2")
|
||||
.getOrCreate()
|
||||
assert(session.sparkContext.conf.get("some-config") == "v2")
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.spark.sql.execution
|
|||
import org.scalatest.BeforeAndAfterAll
|
||||
|
||||
import org.apache.spark.{MapOutputStatistics, SparkConf, SparkFunSuite}
|
||||
import org.apache.spark.internal.config.UI.UI_ENABLED
|
||||
import org.apache.spark.sql._
|
||||
import org.apache.spark.sql.execution.exchange.{ExchangeCoordinator, ReusedExchangeExec, ShuffleExchangeExec}
|
||||
import org.apache.spark.sql.functions._
|
||||
|
@ -262,7 +263,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
|
|||
new SparkConf(false)
|
||||
.setMaster("local[*]")
|
||||
.setAppName("test")
|
||||
.set("spark.ui.enabled", "false")
|
||||
.set(UI_ENABLED, false)
|
||||
.set(SQLConf.SHUFFLE_PARTITIONS.key, "5")
|
||||
.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true")
|
||||
.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "-1")
|
||||
|
|
|
@ -23,6 +23,7 @@ import scala.util.Random
|
|||
|
||||
import org.apache.spark.SparkConf
|
||||
import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
|
||||
import org.apache.spark.internal.config.UI._
|
||||
import org.apache.spark.sql.{DataFrame, DataFrameWriter, Row, SparkSession}
|
||||
import org.apache.spark.sql.catalyst.InternalRow
|
||||
import org.apache.spark.sql.catalyst.plans.SQLHelper
|
||||
|
@ -50,7 +51,7 @@ object DataSourceReadBenchmark extends BenchmarkBase with SQLHelper {
|
|||
.set("spark.master", "local[1]")
|
||||
.setIfMissing("spark.driver.memory", "3g")
|
||||
.setIfMissing("spark.executor.memory", "3g")
|
||||
.setIfMissing("spark.ui.enabled", "false")
|
||||
.setIfMissing(UI_ENABLED, false)
|
||||
|
||||
val spark = SparkSession.builder.config(conf).getOrCreate()
|
||||
|
||||
|
|
|
@ -23,6 +23,7 @@ import scala.util.Random
|
|||
|
||||
import org.apache.spark.SparkConf
|
||||
import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
|
||||
import org.apache.spark.internal.config.UI._
|
||||
import org.apache.spark.sql.{DataFrame, SparkSession}
|
||||
import org.apache.spark.sql.catalyst.plans.SQLHelper
|
||||
import org.apache.spark.sql.functions.monotonically_increasing_id
|
||||
|
@ -48,7 +49,7 @@ object FilterPushdownBenchmark extends BenchmarkBase with SQLHelper {
|
|||
.set("spark.master", "local[1]")
|
||||
.setIfMissing("spark.driver.memory", "3g")
|
||||
.setIfMissing("spark.executor.memory", "3g")
|
||||
.setIfMissing("spark.ui.enabled", "false")
|
||||
.setIfMissing(UI_ENABLED, false)
|
||||
.setIfMissing("orc.compression", "snappy")
|
||||
.setIfMissing("spark.sql.parquet.compression.codec", "snappy")
|
||||
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.hive.service.server.HiveServer2
|
|||
import org.apache.spark.SparkContext
|
||||
import org.apache.spark.annotation.DeveloperApi
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.internal.config.UI.UI_ENABLED
|
||||
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerJobStart}
|
||||
import org.apache.spark.sql.SQLContext
|
||||
import org.apache.spark.sql.hive.HiveUtils
|
||||
|
@ -63,7 +64,7 @@ object HiveThriftServer2 extends Logging {
|
|||
server.start()
|
||||
listener = new HiveThriftServer2Listener(server, sqlContext.conf)
|
||||
sqlContext.sparkContext.addSparkListener(listener)
|
||||
uiTab = if (sqlContext.sparkContext.getConf.getBoolean("spark.ui.enabled", true)) {
|
||||
uiTab = if (sqlContext.sparkContext.getConf.get(UI_ENABLED)) {
|
||||
Some(new ThriftServerTab(sqlContext.sparkContext))
|
||||
} else {
|
||||
None
|
||||
|
@ -101,7 +102,7 @@ object HiveThriftServer2 extends Logging {
|
|||
logInfo("HiveThriftServer2 started")
|
||||
listener = new HiveThriftServer2Listener(server, SparkSQLEnv.sqlContext.conf)
|
||||
SparkSQLEnv.sparkContext.addSparkListener(listener)
|
||||
uiTab = if (SparkSQLEnv.sparkContext.getConf.getBoolean("spark.ui.enabled", true)) {
|
||||
uiTab = if (SparkSQLEnv.sparkContext.getConf.get(UI_ENABLED)) {
|
||||
Some(new ThriftServerTab(SparkSQLEnv.sparkContext))
|
||||
} else {
|
||||
None
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
|
|||
|
||||
import org.apache.spark.{SparkConf, SparkContext}
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.internal.config.UI._
|
||||
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession, SQLContext}
|
||||
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
|
||||
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener
|
||||
|
@ -59,7 +60,7 @@ object TestHive
|
|||
"org.apache.spark.sql.hive.execution.PairSerDe")
|
||||
.set("spark.sql.warehouse.dir", TestHiveContext.makeWarehouseDir().toURI.getPath)
|
||||
// SPARK-8910
|
||||
.set("spark.ui.enabled", "false")
|
||||
.set(UI_ENABLED, false)
|
||||
.set("spark.unsafe.exceptionOnMemoryLeak", "true")
|
||||
// Disable ConvertToLocalRelation for better test coverage. Test cases built on
|
||||
// LocalRelation will exercise the optimization rules better by disabling it as
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.scalatest.{BeforeAndAfterEach, Matchers}
|
|||
|
||||
import org.apache.spark._
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.internal.config.UI.UI_ENABLED
|
||||
import org.apache.spark.sql.{QueryTest, Row, SparkSession}
|
||||
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
|
||||
import org.apache.spark.sql.catalyst.catalog._
|
||||
|
@ -335,7 +336,7 @@ object SetMetastoreURLTest extends Logging {
|
|||
val sparkConf = new SparkConf(loadDefaults = true)
|
||||
val builder = SparkSession.builder()
|
||||
.config(sparkConf)
|
||||
.config("spark.ui.enabled", "false")
|
||||
.config(UI_ENABLED.key, "false")
|
||||
.config("spark.sql.hive.metastore.version", "0.13.1")
|
||||
// The issue described in SPARK-16901 only appear when
|
||||
// spark.sql.hive.metastore.jars is not set to builtin.
|
||||
|
@ -370,7 +371,7 @@ object SetWarehouseLocationTest extends Logging {
|
|||
def main(args: Array[String]): Unit = {
|
||||
TestUtils.configTestLog4j("INFO")
|
||||
|
||||
val sparkConf = new SparkConf(loadDefaults = true).set("spark.ui.enabled", "false")
|
||||
val sparkConf = new SparkConf(loadDefaults = true).set(UI_ENABLED, false)
|
||||
val providedExpectedWarehouseLocation =
|
||||
sparkConf.getOption("spark.sql.test.expectedWarehouseDir")
|
||||
|
||||
|
@ -449,7 +450,7 @@ object TemporaryHiveUDFTest extends Logging {
|
|||
def main(args: Array[String]) {
|
||||
TestUtils.configTestLog4j("INFO")
|
||||
val conf = new SparkConf()
|
||||
conf.set("spark.ui.enabled", "false")
|
||||
conf.set(UI_ENABLED, false)
|
||||
val sc = new SparkContext(conf)
|
||||
val hiveContext = new TestHiveContext(sc)
|
||||
|
||||
|
@ -487,7 +488,7 @@ object PermanentHiveUDFTest1 extends Logging {
|
|||
def main(args: Array[String]) {
|
||||
TestUtils.configTestLog4j("INFO")
|
||||
val conf = new SparkConf()
|
||||
conf.set("spark.ui.enabled", "false")
|
||||
conf.set(UI_ENABLED, false)
|
||||
val sc = new SparkContext(conf)
|
||||
val hiveContext = new TestHiveContext(sc)
|
||||
|
||||
|
@ -525,7 +526,7 @@ object PermanentHiveUDFTest2 extends Logging {
|
|||
def main(args: Array[String]) {
|
||||
TestUtils.configTestLog4j("INFO")
|
||||
val conf = new SparkConf()
|
||||
conf.set("spark.ui.enabled", "false")
|
||||
conf.set(UI_ENABLED, false)
|
||||
val sc = new SparkContext(conf)
|
||||
val hiveContext = new TestHiveContext(sc)
|
||||
// Load a Hive UDF from the jar.
|
||||
|
@ -561,7 +562,7 @@ object SparkSubmitClassLoaderTest extends Logging {
|
|||
TestUtils.configTestLog4j("INFO")
|
||||
val conf = new SparkConf()
|
||||
val hiveWarehouseLocation = Utils.createTempDir()
|
||||
conf.set("spark.ui.enabled", "false")
|
||||
conf.set(UI_ENABLED, false)
|
||||
conf.set("spark.sql.warehouse.dir", hiveWarehouseLocation.toString)
|
||||
val sc = new SparkContext(conf)
|
||||
val hiveContext = new TestHiveContext(sc)
|
||||
|
@ -654,7 +655,7 @@ object SparkSQLConfTest extends Logging {
|
|||
// For this simple test, we do not really clone this object.
|
||||
override def clone: SparkConf = this
|
||||
}
|
||||
conf.set("spark.ui.enabled", "false")
|
||||
conf.set(UI_ENABLED, false)
|
||||
val sc = new SparkContext(conf)
|
||||
val hiveContext = new TestHiveContext(sc)
|
||||
// Run a simple command to make sure all lazy vals in hiveContext get instantiated.
|
||||
|
@ -676,7 +677,7 @@ object SPARK_9757 extends QueryTest {
|
|||
new SparkConf()
|
||||
.set("spark.sql.hive.metastore.version", "0.13.1")
|
||||
.set("spark.sql.hive.metastore.jars", "maven")
|
||||
.set("spark.ui.enabled", "false")
|
||||
.set(UI_ENABLED, false)
|
||||
.set("spark.sql.warehouse.dir", hiveWarehouseLocation.toString))
|
||||
|
||||
val hiveContext = new TestHiveContext(sparkContext)
|
||||
|
@ -722,7 +723,7 @@ object SPARK_11009 extends QueryTest {
|
|||
|
||||
val sparkContext = new SparkContext(
|
||||
new SparkConf()
|
||||
.set("spark.ui.enabled", "false")
|
||||
.set(UI_ENABLED, false)
|
||||
.set("spark.sql.shuffle.partitions", "100"))
|
||||
|
||||
val hiveContext = new TestHiveContext(sparkContext)
|
||||
|
@ -753,7 +754,7 @@ object SPARK_14244 extends QueryTest {
|
|||
|
||||
val sparkContext = new SparkContext(
|
||||
new SparkConf()
|
||||
.set("spark.ui.enabled", "false")
|
||||
.set(UI_ENABLED, false)
|
||||
.set("spark.sql.shuffle.partitions", "100"))
|
||||
|
||||
val hiveContext = new TestHiveContext(sparkContext)
|
||||
|
@ -774,7 +775,7 @@ object SPARK_14244 extends QueryTest {
|
|||
object SPARK_18360 {
|
||||
def main(args: Array[String]): Unit = {
|
||||
val spark = SparkSession.builder()
|
||||
.config("spark.ui.enabled", "false")
|
||||
.config(UI_ENABLED.key, "false")
|
||||
.enableHiveSupport().getOrCreate()
|
||||
|
||||
val defaultDbLocation = spark.catalog.getDatabase("default").locationUri
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.execution
|
|||
import org.scalatest.BeforeAndAfterAll
|
||||
|
||||
import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
|
||||
import org.apache.spark.internal.config.UI.UI_ENABLED
|
||||
import org.apache.spark.sql.hive.test.TestHiveContext
|
||||
|
||||
class ConcurrentHiveSuite extends SparkFunSuite with BeforeAndAfterAll {
|
||||
|
@ -27,7 +28,7 @@ class ConcurrentHiveSuite extends SparkFunSuite with BeforeAndAfterAll {
|
|||
test("Multiple Hive Instances") {
|
||||
(1 to 10).map { i =>
|
||||
val conf = new SparkConf()
|
||||
conf.set("spark.ui.enabled", "false")
|
||||
conf.set(UI_ENABLED, false)
|
||||
val ts =
|
||||
new TestHiveContext(new SparkContext("local", s"TestSQLContext$i", conf))
|
||||
ts.sparkSession.sql("SHOW TABLES").collect()
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
|
|||
import org.apache.spark.{SparkConf, SparkException}
|
||||
import org.apache.spark.deploy.SparkHadoopUtil
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.internal.config.UI._
|
||||
import org.apache.spark.io.CompressionCodec
|
||||
import org.apache.spark.streaming.scheduler.JobGenerator
|
||||
import org.apache.spark.util.Utils
|
||||
|
@ -61,7 +62,7 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time)
|
|||
"spark.yarn.principal",
|
||||
"spark.kerberos.keytab",
|
||||
"spark.kerberos.principal",
|
||||
"spark.ui.filters",
|
||||
UI_FILTERS.key,
|
||||
"spark.mesos.driver.frameworkId")
|
||||
|
||||
val newSparkConf = new SparkConf(loadDefaults = false).setAll(sparkConfPairs)
|
||||
|
|
|
@ -38,6 +38,7 @@ import org.apache.spark.annotation.{DeveloperApi, Experimental}
|
|||
import org.apache.spark.deploy.SparkHadoopUtil
|
||||
import org.apache.spark.input.FixedLengthBinaryInputFormat
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.internal.config.UI._
|
||||
import org.apache.spark.rdd.{RDD, RDDOperationScope}
|
||||
import org.apache.spark.scheduler.LiveListenerBus
|
||||
import org.apache.spark.serializer.SerializationDebugger
|
||||
|
@ -188,7 +189,7 @@ class StreamingContext private[streaming] (
|
|||
private[streaming] val progressListener = new StreamingJobProgressListener(this)
|
||||
|
||||
private[streaming] val uiTab: Option[StreamingTab] =
|
||||
if (conf.getBoolean("spark.ui.enabled", true)) {
|
||||
if (conf.get(UI_ENABLED)) {
|
||||
Some(new StreamingTab(this))
|
||||
} else {
|
||||
None
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.scalatest.concurrent.Eventually._
|
|||
import org.scalatest.time.SpanSugar._
|
||||
|
||||
import org.apache.spark.SparkConf
|
||||
import org.apache.spark.internal.config.UI._
|
||||
import org.apache.spark.storage.StorageLevel
|
||||
import org.apache.spark.storage.StreamBlockId
|
||||
import org.apache.spark.streaming.receiver._
|
||||
|
@ -200,7 +201,7 @@ class ReceiverSuite extends TestSuiteBase with TimeLimits with Serializable {
|
|||
val sparkConf = new SparkConf()
|
||||
.setMaster("local[4]") // must be at least 3 as we are going to start 2 receivers
|
||||
.setAppName(framework)
|
||||
.set("spark.ui.enabled", "true")
|
||||
.set(UI_ENABLED, true)
|
||||
.set("spark.streaming.receiver.writeAheadLog.enable", "true")
|
||||
.set("spark.streaming.receiver.writeAheadLog.rollingIntervalSecs", "1")
|
||||
val batchDuration = Milliseconds(500)
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.scalatest.selenium.WebBrowser
|
|||
import org.scalatest.time.SpanSugar._
|
||||
|
||||
import org.apache.spark._
|
||||
import org.apache.spark.internal.config.UI.UI_ENABLED
|
||||
import org.apache.spark.ui.SparkUICssErrorHandler
|
||||
|
||||
/**
|
||||
|
@ -61,7 +62,7 @@ class UISeleniumSuite
|
|||
val conf = new SparkConf()
|
||||
.setMaster("local")
|
||||
.setAppName("test")
|
||||
.set("spark.ui.enabled", "true")
|
||||
.set(UI_ENABLED, true)
|
||||
val ssc = new StreamingContext(conf, Seconds(1))
|
||||
assert(ssc.sc.ui.isDefined, "Spark UI is not started!")
|
||||
ssc
|
||||
|
|
Loading…
Reference in a new issue