From c17150a5f5a6c4f4a83ce8c055eab9fea78df08e Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Tue, 26 Feb 2019 14:49:46 -0800 Subject: [PATCH] [SPARK-22860][CORE][YARN] Redact command line arguments for running Driver and Executor before logging (standalone and YARN) ## What changes were proposed in this pull request? This patch applies redaction to command line arguments before logging them. This applies to two resource managers: standalone cluster and YARN. This patch only concerns about arguments starting with `-D` since Spark is likely passing the Spark configuration to command line arguments as `-Dspark.blabla=blabla`. More change is necessary if we also want to handle the case of `--conf spark.blabla=blabla`. ## How was this patch tested? Added UT for redact logic. This patch only touches how to log so not easy to add UT regarding it. Closes #23820 from HeartSaVioR/MINOR-redact-command-line-args-for-running-driver-executor. Authored-by: Jungtaek Lim (HeartSaVioR) Signed-off-by: Marcelo Vanzin --- .../spark/deploy/worker/DriverRunner.scala | 9 ++-- .../spark/deploy/worker/ExecutorRunner.scala | 9 ++-- .../scala/org/apache/spark/util/Utils.scala | 13 +++++ .../org/apache/spark/util/UtilsSuite.scala | 52 +++++++++++++++++++ .../spark/deploy/yarn/ExecutorRunnable.scala | 2 +- 5 files changed, 77 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index 8c2a907b86..0c88119441 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -193,8 +193,9 @@ private[deploy] class DriverRunner( CommandUtils.redirectStream(process.getInputStream, stdout) val stderr = new File(baseDir, "stderr") - val formattedCommand = builder.command.asScala.mkString("\"", "\" \"", "\"") - val header = "Launch Command: %s\n%s\n\n".format(formattedCommand, "=" * 40) + val redactedCommand = Utils.redactCommandLineArgs(conf, builder.command.asScala) + .mkString("\"", "\" \"", "\"") + val header = "Launch Command: %s\n%s\n\n".format(redactedCommand, "=" * 40) Files.append(header, stderr, StandardCharsets.UTF_8) CommandUtils.redirectStream(process.getErrorStream, stderr) } @@ -210,8 +211,10 @@ private[deploy] class DriverRunner( val successfulRunDuration = 5 var keepTrying = !killed + val redactedCommand = Utils.redactCommandLineArgs(conf, command.command) + .mkString("\"", "\" \"", "\"") while (keepTrying) { - logInfo("Launch Command: " + command.command.mkString("\"", "\" \"", "\"")) + logInfo("Launch Command: " + redactedCommand) synchronized { if (killed) { return exitCode } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index c74a95718d..ead28f13b5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -25,7 +25,7 @@ import scala.collection.JavaConverters._ import com.google.common.io.Files import org.apache.spark.{SecurityManager, SparkConf} -import org.apache.spark.deploy.{ApplicationDescription, Command, ExecutorState} +import org.apache.spark.deploy.{ApplicationDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged import org.apache.spark.internal.Logging import org.apache.spark.internal.config.UI._ @@ -150,8 +150,9 @@ private[deploy] class ExecutorRunner( val builder = CommandUtils.buildProcessBuilder(subsCommand, new SecurityManager(conf), memory, sparkHome.getAbsolutePath, substituteVariables) val command = builder.command() - val formattedCommand = command.asScala.mkString("\"", "\" \"", "\"") - logInfo(s"Launch command: $formattedCommand") + val redactedCommand = Utils.redactCommandLineArgs(conf, command.asScala) + .mkString("\"", "\" \"", "\"") + logInfo(s"Launch command: $redactedCommand") builder.directory(executorDir) builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator)) @@ -171,7 +172,7 @@ private[deploy] class ExecutorRunner( process = builder.start() val header = "Spark Executor Command: %s\n%s\n\n".format( - formattedCommand, "=" * 40) + redactedCommand, "=" * 40) // Redirect its stdout and stderr to files val stdout = new File(executorDir, "stdout") diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 3065bdf063..cade0dd88f 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -98,6 +98,8 @@ private[spark] object Utils extends Logging { /** Scheme used for files that are locally available on worker nodes in the cluster. */ val LOCAL_SCHEME = "local" + private val PATTERN_FOR_COMMAND_LINE_ARG = "-D(.+?)=(.+)".r + /** Serialize an object using Java serialization */ def serialize[T](o: T): Array[Byte] = { val bos = new ByteArrayOutputStream() @@ -2617,6 +2619,17 @@ private[spark] object Utils extends Logging { redact(redactionPattern, kvs.toArray) } + def redactCommandLineArgs(conf: SparkConf, commands: Seq[String]): Seq[String] = { + val redactionPattern = conf.get(SECRET_REDACTION_PATTERN) + commands.map { + case PATTERN_FOR_COMMAND_LINE_ARG(key, value) => + val (_, newValue) = redact(redactionPattern, Seq((key, value))).head + s"-D$key=$newValue" + + case cmd => cmd + } + } + def stringToSeq(str: String): Seq[String] = { str.split(",").map(_.trim()).filter(_.nonEmpty) } diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 6c2159eb45..fdd9771b6e 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -1016,6 +1016,58 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { } + test("redact sensitive information in command line args") { + val sparkConf = new SparkConf + + // Set some secret keys + val secretKeysWithSameValue = Seq( + "spark.executorEnv.HADOOP_CREDSTORE_PASSWORD", + "spark.my.password", + "spark.my.sECreT") + val cmdArgsForSecretWithSameValue = secretKeysWithSameValue.map(s => s"-D$s=sensitive_value") + + val secretKeys = secretKeysWithSameValue ++ Seq("spark.your.password") + val cmdArgsForSecret = cmdArgsForSecretWithSameValue ++ Seq( + // Have '=' twice + "-Dspark.your.password=sensitive=sensitive2" + ) + + val ignoredArgs = Seq( + // starts with -D but no assignment + "-Ddummy", + // secret value contained not starting with -D (we don't care about this case for now) + "spark.my.password=sensitive_value", + // edge case: not started with -D, but matched pattern after first '-' + "--Dspark.my.password=sensitive_value") + + val cmdArgs = cmdArgsForSecret ++ ignoredArgs ++ Seq( + // Set a non-secret key + "-Dspark.regular.property=regular_value", + // Set a property with a regular key but secret in the value + "-Dspark.sensitive.property=has_secret_in_value") + + // Redact sensitive information + val redactedCmdArgs = Utils.redactCommandLineArgs(sparkConf, cmdArgs) + + // These arguments should be left as they were: + // 1) argument without -D option is not applied + // 2) -D option without key-value assignment is not applied + assert(ignoredArgs.forall(redactedCmdArgs.contains)) + + val redactedCmdArgMap = redactedCmdArgs.filterNot(ignoredArgs.contains).map { cmd => + val keyValue = cmd.substring("-D".length).split("=") + keyValue(0) -> keyValue.tail.mkString("=") + }.toMap + + // Assert that secret information got redacted while the regular property remained the same + secretKeys.foreach { key => + assert(redactedCmdArgMap(key) === Utils.REDACTION_REPLACEMENT_TEXT) + } + + assert(redactedCmdArgMap("spark.regular.property") === "regular_value") + assert(redactedCmdArgMap("spark.sensitive.property") === Utils.REDACTION_REPLACEMENT_TEXT) + } + test("tryWithSafeFinally") { var e = new Error("Block0") val finallyBlockError = new Error("Finally Block") diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 2f8f2a0a11..7046ad7405 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -76,7 +76,7 @@ private[yarn] class ExecutorRunnable( | env: |${Utils.redact(sparkConf, env.toSeq).map { case (k, v) => s" $k -> $v\n" }.mkString} | command: - | ${commands.mkString(" \\ \n ")} + | ${Utils.redactCommandLineArgs(sparkConf, commands).mkString(" \\ \n ")} | | resources: |${localResources.map { case (k, v) => s" $k -> $v\n" }.mkString}