From dbba3a33bc6712bec6c6cfa3dbb872c42dd0fbff Mon Sep 17 00:00:00 2001 From: John Zhuge Date: Mon, 10 Jun 2019 11:58:37 -0700 Subject: [PATCH] [SPARK-27947][SQL] Enhance redactOptions to accept any Map type MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What changes were proposed in this pull request? Handle the case when ParsedStatement subclass has a Map field but not of type Map[String, String]. In ParsedStatement.productIterator, `case mapArg: Map[_, _]` can match any Map type due to type erasure, thus causing `asInstanceOf[Map[String, String]]` to throw ClassCastException. The following test reproduces the issue: ``` case class TestStatement(p: Map[String, Int]) extends ParsedStatement { override def output: Seq[Attribute] = Nil override def children: Seq[LogicalPlan] = Nil } TestStatement(Map("abc" -> 1)).toString ``` Changing the code to `case mapArg: Map[String, String]` will not help due to type erasure. As a matter of fact, compiler gives this warning: ``` Warning:(41, 18) non-variable type argument String in type pattern scala.collection.immutable.Map[String,String] (the underlying of Map[String,String]) is unchecked since it is eliminated by erasure case mapArg: Map[String, String] => ``` ## How was this patch tested? Add 2 unit tests. Closes #24800 from jzhuge/SPARK-27947. Authored-by: John Zhuge Signed-off-by: Dongjoon Hyun --- .../scala/org/apache/spark/util/Utils.scala | 23 ++++++++++++------- .../org/apache/spark/util/UtilsSuite.scala | 13 +++++++++++ .../plans/logical/sql/ParsedStatement.scala | 2 +- .../apache/spark/sql/internal/SQLConf.scala | 2 +- 4 files changed, 30 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index bed50865e7..00135c3259 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2596,7 +2596,7 @@ private[spark] object Utils extends Logging { * Redact the sensitive values in the given map. If a map key matches the redaction pattern then * its value is replaced with a dummy text. */ - def redact(regex: Option[Regex], kvs: Seq[(String, String)]): Seq[(String, String)] = { + def redact[K, V](regex: Option[Regex], kvs: Seq[(K, V)]): Seq[(K, V)] = { regex match { case None => kvs case Some(r) => redact(r, kvs) @@ -2618,7 +2618,7 @@ private[spark] object Utils extends Logging { } } - private def redact(redactionPattern: Regex, kvs: Seq[(String, String)]): Seq[(String, String)] = { + private def redact[K, V](redactionPattern: Regex, kvs: Seq[(K, V)]): Seq[(K, V)] = { // If the sensitive information regex matches with either the key or the value, redact the value // While the original intent was to only redact the value if the key matched with the regex, // we've found that especially in verbose mode, the value of the property may contain sensitive @@ -2632,12 +2632,19 @@ private[spark] object Utils extends Logging { // arbitrary property contained the term 'password', we may redact the value from the UI and // logs. In order to work around it, user would have to make the spark.redaction.regex property // more specific. - kvs.map { case (key, value) => - redactionPattern.findFirstIn(key) - .orElse(redactionPattern.findFirstIn(value)) - .map { _ => (key, REDACTION_REPLACEMENT_TEXT) } - .getOrElse((key, value)) - } + kvs.map { + case (key: String, value: String) => + redactionPattern.findFirstIn(key) + .orElse(redactionPattern.findFirstIn(value)) + .map { _ => (key, REDACTION_REPLACEMENT_TEXT) } + .getOrElse((key, value)) + case (key, value: String) => + redactionPattern.findFirstIn(value) + .map { _ => (key, REDACTION_REPLACEMENT_TEXT) } + .getOrElse((key, value)) + case (key, value) => + (key, value) + }.asInstanceOf[Seq[(K, V)]] } /** diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index d2d9eb0633..84a7973303 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -1120,6 +1120,19 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { assert(redactedCmdArgMap("spark.sensitive.property") === Utils.REDACTION_REPLACEMENT_TEXT) } + test("redact sensitive information in sequence of key value pairs") { + val secretKeys = Some("my.password".r) + assert(Utils.redact(secretKeys, Seq(("spark.my.password", "12345"))) === + Seq(("spark.my.password", Utils.REDACTION_REPLACEMENT_TEXT))) + assert(Utils.redact(secretKeys, Seq(("anything", "spark.my.password=12345"))) === + Seq(("anything", Utils.REDACTION_REPLACEMENT_TEXT))) + assert(Utils.redact(secretKeys, Seq((999, "spark.my.password=12345"))) === + Seq((999, Utils.REDACTION_REPLACEMENT_TEXT))) + // Do not redact when value type is not string + assert(Utils.redact(secretKeys, Seq(("my.password", 12345))) === + Seq(("my.password", 12345))) + } + test("tryWithSafeFinally") { var e = new Error("Block0") val finallyBlockError = new Error("Finally Block") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/ParsedStatement.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/ParsedStatement.scala index 2942c4b1fc..23fc009fec 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/ParsedStatement.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/ParsedStatement.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan private[sql] abstract class ParsedStatement extends LogicalPlan { // Redact properties and options when parsed nodes are used by generic methods like toString override def productIterator: Iterator[Any] = super.productIterator.map { - case mapArg: Map[_, _] => conf.redactOptions(mapArg.asInstanceOf[Map[String, String]]) + case mapArg: Map[_, _] => conf.redactOptions(mapArg) case other => other } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 583db58a67..fee71723ed 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2353,7 +2353,7 @@ class SQLConf extends Serializable with Logging { /** * Redacts the given option map according to the description of SQL_OPTIONS_REDACTION_PATTERN. */ - def redactOptions(options: Map[String, String]): Map[String, String] = { + def redactOptions[K, V](options: Map[K, V]): Map[K, V] = { val regexes = Seq( getConf(SQL_OPTIONS_REDACTION_PATTERN), SECRET_REDACTION_PATTERN.readFrom(reader))