[SPARK-22860][CORE][YARN] Redact command line arguments for running Driver and Executor before logging (standalone and YARN)
## What changes were proposed in this pull request? This patch applies redaction to command line arguments before logging them. This applies to two resource managers: standalone cluster and YARN. This patch only concerns about arguments starting with `-D` since Spark is likely passing the Spark configuration to command line arguments as `-Dspark.blabla=blabla`. More change is necessary if we also want to handle the case of `--conf spark.blabla=blabla`. ## How was this patch tested? Added UT for redact logic. This patch only touches how to log so not easy to add UT regarding it. Closes #23820 from HeartSaVioR/MINOR-redact-command-line-args-for-running-driver-executor. Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
This commit is contained in:
parent
afbff6446f
commit
c17150a5f5
|
@ -193,8 +193,9 @@ private[deploy] class DriverRunner(
|
|||
CommandUtils.redirectStream(process.getInputStream, stdout)
|
||||
|
||||
val stderr = new File(baseDir, "stderr")
|
||||
val formattedCommand = builder.command.asScala.mkString("\"", "\" \"", "\"")
|
||||
val header = "Launch Command: %s\n%s\n\n".format(formattedCommand, "=" * 40)
|
||||
val redactedCommand = Utils.redactCommandLineArgs(conf, builder.command.asScala)
|
||||
.mkString("\"", "\" \"", "\"")
|
||||
val header = "Launch Command: %s\n%s\n\n".format(redactedCommand, "=" * 40)
|
||||
Files.append(header, stderr, StandardCharsets.UTF_8)
|
||||
CommandUtils.redirectStream(process.getErrorStream, stderr)
|
||||
}
|
||||
|
@ -210,8 +211,10 @@ private[deploy] class DriverRunner(
|
|||
val successfulRunDuration = 5
|
||||
var keepTrying = !killed
|
||||
|
||||
val redactedCommand = Utils.redactCommandLineArgs(conf, command.command)
|
||||
.mkString("\"", "\" \"", "\"")
|
||||
while (keepTrying) {
|
||||
logInfo("Launch Command: " + command.command.mkString("\"", "\" \"", "\""))
|
||||
logInfo("Launch Command: " + redactedCommand)
|
||||
|
||||
synchronized {
|
||||
if (killed) { return exitCode }
|
||||
|
|
|
@ -25,7 +25,7 @@ import scala.collection.JavaConverters._
|
|||
import com.google.common.io.Files
|
||||
|
||||
import org.apache.spark.{SecurityManager, SparkConf}
|
||||
import org.apache.spark.deploy.{ApplicationDescription, Command, ExecutorState}
|
||||
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
|
||||
import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.internal.config.UI._
|
||||
|
@ -150,8 +150,9 @@ private[deploy] class ExecutorRunner(
|
|||
val builder = CommandUtils.buildProcessBuilder(subsCommand, new SecurityManager(conf),
|
||||
memory, sparkHome.getAbsolutePath, substituteVariables)
|
||||
val command = builder.command()
|
||||
val formattedCommand = command.asScala.mkString("\"", "\" \"", "\"")
|
||||
logInfo(s"Launch command: $formattedCommand")
|
||||
val redactedCommand = Utils.redactCommandLineArgs(conf, command.asScala)
|
||||
.mkString("\"", "\" \"", "\"")
|
||||
logInfo(s"Launch command: $redactedCommand")
|
||||
|
||||
builder.directory(executorDir)
|
||||
builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))
|
||||
|
@ -171,7 +172,7 @@ private[deploy] class ExecutorRunner(
|
|||
|
||||
process = builder.start()
|
||||
val header = "Spark Executor Command: %s\n%s\n\n".format(
|
||||
formattedCommand, "=" * 40)
|
||||
redactedCommand, "=" * 40)
|
||||
|
||||
// Redirect its stdout and stderr to files
|
||||
val stdout = new File(executorDir, "stdout")
|
||||
|
|
|
@ -98,6 +98,8 @@ private[spark] object Utils extends Logging {
|
|||
/** Scheme used for files that are locally available on worker nodes in the cluster. */
|
||||
val LOCAL_SCHEME = "local"
|
||||
|
||||
private val PATTERN_FOR_COMMAND_LINE_ARG = "-D(.+?)=(.+)".r
|
||||
|
||||
/** Serialize an object using Java serialization */
|
||||
def serialize[T](o: T): Array[Byte] = {
|
||||
val bos = new ByteArrayOutputStream()
|
||||
|
@ -2617,6 +2619,17 @@ private[spark] object Utils extends Logging {
|
|||
redact(redactionPattern, kvs.toArray)
|
||||
}
|
||||
|
||||
def redactCommandLineArgs(conf: SparkConf, commands: Seq[String]): Seq[String] = {
|
||||
val redactionPattern = conf.get(SECRET_REDACTION_PATTERN)
|
||||
commands.map {
|
||||
case PATTERN_FOR_COMMAND_LINE_ARG(key, value) =>
|
||||
val (_, newValue) = redact(redactionPattern, Seq((key, value))).head
|
||||
s"-D$key=$newValue"
|
||||
|
||||
case cmd => cmd
|
||||
}
|
||||
}
|
||||
|
||||
def stringToSeq(str: String): Seq[String] = {
|
||||
str.split(",").map(_.trim()).filter(_.nonEmpty)
|
||||
}
|
||||
|
|
|
@ -1016,6 +1016,58 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
|
|||
|
||||
}
|
||||
|
||||
test("redact sensitive information in command line args") {
|
||||
val sparkConf = new SparkConf
|
||||
|
||||
// Set some secret keys
|
||||
val secretKeysWithSameValue = Seq(
|
||||
"spark.executorEnv.HADOOP_CREDSTORE_PASSWORD",
|
||||
"spark.my.password",
|
||||
"spark.my.sECreT")
|
||||
val cmdArgsForSecretWithSameValue = secretKeysWithSameValue.map(s => s"-D$s=sensitive_value")
|
||||
|
||||
val secretKeys = secretKeysWithSameValue ++ Seq("spark.your.password")
|
||||
val cmdArgsForSecret = cmdArgsForSecretWithSameValue ++ Seq(
|
||||
// Have '=' twice
|
||||
"-Dspark.your.password=sensitive=sensitive2"
|
||||
)
|
||||
|
||||
val ignoredArgs = Seq(
|
||||
// starts with -D but no assignment
|
||||
"-Ddummy",
|
||||
// secret value contained not starting with -D (we don't care about this case for now)
|
||||
"spark.my.password=sensitive_value",
|
||||
// edge case: not started with -D, but matched pattern after first '-'
|
||||
"--Dspark.my.password=sensitive_value")
|
||||
|
||||
val cmdArgs = cmdArgsForSecret ++ ignoredArgs ++ Seq(
|
||||
// Set a non-secret key
|
||||
"-Dspark.regular.property=regular_value",
|
||||
// Set a property with a regular key but secret in the value
|
||||
"-Dspark.sensitive.property=has_secret_in_value")
|
||||
|
||||
// Redact sensitive information
|
||||
val redactedCmdArgs = Utils.redactCommandLineArgs(sparkConf, cmdArgs)
|
||||
|
||||
// These arguments should be left as they were:
|
||||
// 1) argument without -D option is not applied
|
||||
// 2) -D option without key-value assignment is not applied
|
||||
assert(ignoredArgs.forall(redactedCmdArgs.contains))
|
||||
|
||||
val redactedCmdArgMap = redactedCmdArgs.filterNot(ignoredArgs.contains).map { cmd =>
|
||||
val keyValue = cmd.substring("-D".length).split("=")
|
||||
keyValue(0) -> keyValue.tail.mkString("=")
|
||||
}.toMap
|
||||
|
||||
// Assert that secret information got redacted while the regular property remained the same
|
||||
secretKeys.foreach { key =>
|
||||
assert(redactedCmdArgMap(key) === Utils.REDACTION_REPLACEMENT_TEXT)
|
||||
}
|
||||
|
||||
assert(redactedCmdArgMap("spark.regular.property") === "regular_value")
|
||||
assert(redactedCmdArgMap("spark.sensitive.property") === Utils.REDACTION_REPLACEMENT_TEXT)
|
||||
}
|
||||
|
||||
test("tryWithSafeFinally") {
|
||||
var e = new Error("Block0")
|
||||
val finallyBlockError = new Error("Finally Block")
|
||||
|
|
|
@ -76,7 +76,7 @@ private[yarn] class ExecutorRunnable(
|
|||
| env:
|
||||
|${Utils.redact(sparkConf, env.toSeq).map { case (k, v) => s" $k -> $v\n" }.mkString}
|
||||
| command:
|
||||
| ${commands.mkString(" \\ \n ")}
|
||||
| ${Utils.redactCommandLineArgs(sparkConf, commands).mkString(" \\ \n ")}
|
||||
|
|
||||
| resources:
|
||||
|${localResources.map { case (k, v) => s" $k -> $v\n" }.mkString}
|
||||
|
|
Loading…
Reference in a new issue