[SPARK-25221][DEPLOY] Consistent trailing whitespace treatment of conf values
## What changes were proposed in this pull request? Stop trimming values of properties loaded from a file ## How was this patch tested? Added unit test demonstrating the issue hit in production. Closes #22213 from gerashegalov/gera/SPARK-25221. Authored-by: Gera Shegalov <gera@apache.org> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
This commit is contained in:
parent
77579aa8c3
commit
bcb9a8c83f
|
@ -19,7 +19,6 @@ package org.apache.spark.util
|
||||||
|
|
||||||
import java.io._
|
import java.io._
|
||||||
import java.lang.{Byte => JByte}
|
import java.lang.{Byte => JByte}
|
||||||
import java.lang.InternalError
|
|
||||||
import java.lang.management.{LockInfo, ManagementFactory, MonitorInfo, ThreadInfo}
|
import java.lang.management.{LockInfo, ManagementFactory, MonitorInfo, ThreadInfo}
|
||||||
import java.lang.reflect.InvocationTargetException
|
import java.lang.reflect.InvocationTargetException
|
||||||
import java.math.{MathContext, RoundingMode}
|
import java.math.{MathContext, RoundingMode}
|
||||||
|
@ -2052,6 +2051,30 @@ private[spark] object Utils extends Logging {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Implements the same logic as JDK `java.lang.String#trim` by removing leading and trailing
|
||||||
|
* non-printable characters less or equal to '\u0020' (SPACE) but preserves natural line
|
||||||
|
* delimiters according to [[java.util.Properties]] load method. The natural line delimiters are
|
||||||
|
* removed by JDK during load. Therefore any remaining ones have been specifically provided and
|
||||||
|
* escaped by the user, and must not be ignored
|
||||||
|
*
|
||||||
|
* @param str
|
||||||
|
* @return the trimmed value of str
|
||||||
|
*/
|
||||||
|
private[util] def trimExceptCRLF(str: String): String = {
|
||||||
|
val nonSpaceOrNaturalLineDelimiter: Char => Boolean = { ch =>
|
||||||
|
ch > ' ' || ch == '\r' || ch == '\n'
|
||||||
|
}
|
||||||
|
|
||||||
|
val firstPos = str.indexWhere(nonSpaceOrNaturalLineDelimiter)
|
||||||
|
val lastPos = str.lastIndexWhere(nonSpaceOrNaturalLineDelimiter)
|
||||||
|
if (firstPos >= 0 && lastPos >= 0) {
|
||||||
|
str.substring(firstPos, lastPos + 1)
|
||||||
|
} else {
|
||||||
|
""
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/** Load properties present in the given file. */
|
/** Load properties present in the given file. */
|
||||||
def getPropertiesFromFile(filename: String): Map[String, String] = {
|
def getPropertiesFromFile(filename: String): Map[String, String] = {
|
||||||
val file = new File(filename)
|
val file = new File(filename)
|
||||||
|
@ -2062,8 +2085,10 @@ private[spark] object Utils extends Logging {
|
||||||
try {
|
try {
|
||||||
val properties = new Properties()
|
val properties = new Properties()
|
||||||
properties.load(inReader)
|
properties.load(inReader)
|
||||||
properties.stringPropertyNames().asScala.map(
|
properties.stringPropertyNames().asScala
|
||||||
k => (k, properties.getProperty(k).trim)).toMap
|
.map { k => (k, trimExceptCRLF(properties.getProperty(k))) }
|
||||||
|
.toMap
|
||||||
|
|
||||||
} catch {
|
} catch {
|
||||||
case e: IOException =>
|
case e: IOException =>
|
||||||
throw new SparkException(s"Failed when loading Spark properties from $filename", e)
|
throw new SparkException(s"Failed when loading Spark properties from $filename", e)
|
||||||
|
|
|
@ -1144,6 +1144,53 @@ class SparkSubmitSuite
|
||||||
conf1.get(PY_FILES.key) should be (s"s3a://${pyFile.getAbsolutePath}")
|
conf1.get(PY_FILES.key) should be (s"s3a://${pyFile.getAbsolutePath}")
|
||||||
conf1.get("spark.submit.pyFiles") should (startWith("/"))
|
conf1.get("spark.submit.pyFiles") should (startWith("/"))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test("handles natural line delimiters in --properties-file and --conf uniformly") {
|
||||||
|
val delimKey = "spark.my.delimiter."
|
||||||
|
val LF = "\n"
|
||||||
|
val CR = "\r"
|
||||||
|
|
||||||
|
val lineFeedFromCommandLine = s"${delimKey}lineFeedFromCommandLine" -> LF
|
||||||
|
val leadingDelimKeyFromFile = s"${delimKey}leadingDelimKeyFromFile" -> s"${LF}blah"
|
||||||
|
val trailingDelimKeyFromFile = s"${delimKey}trailingDelimKeyFromFile" -> s"blah${CR}"
|
||||||
|
val infixDelimFromFile = s"${delimKey}infixDelimFromFile" -> s"${CR}blah${LF}"
|
||||||
|
val nonDelimSpaceFromFile = s"${delimKey}nonDelimSpaceFromFile" -> " blah\f"
|
||||||
|
|
||||||
|
val testProps = Seq(leadingDelimKeyFromFile, trailingDelimKeyFromFile, infixDelimFromFile,
|
||||||
|
nonDelimSpaceFromFile)
|
||||||
|
|
||||||
|
val props = new java.util.Properties()
|
||||||
|
val propsFile = File.createTempFile("test-spark-conf", ".properties",
|
||||||
|
Utils.createTempDir())
|
||||||
|
val propsOutputStream = new FileOutputStream(propsFile)
|
||||||
|
try {
|
||||||
|
testProps.foreach { case (k, v) => props.put(k, v) }
|
||||||
|
props.store(propsOutputStream, "test whitespace")
|
||||||
|
} finally {
|
||||||
|
propsOutputStream.close()
|
||||||
|
}
|
||||||
|
|
||||||
|
val clArgs = Seq(
|
||||||
|
"--class", "org.SomeClass",
|
||||||
|
"--conf", s"${lineFeedFromCommandLine._1}=${lineFeedFromCommandLine._2}",
|
||||||
|
"--conf", "spark.master=yarn",
|
||||||
|
"--properties-file", propsFile.getPath,
|
||||||
|
"thejar.jar")
|
||||||
|
|
||||||
|
val appArgs = new SparkSubmitArguments(clArgs)
|
||||||
|
val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs)
|
||||||
|
|
||||||
|
Seq(
|
||||||
|
lineFeedFromCommandLine,
|
||||||
|
leadingDelimKeyFromFile,
|
||||||
|
trailingDelimKeyFromFile,
|
||||||
|
infixDelimFromFile
|
||||||
|
).foreach { case (k, v) =>
|
||||||
|
conf.get(k) should be (v)
|
||||||
|
}
|
||||||
|
|
||||||
|
conf.get(nonDelimSpaceFromFile._1) should be ("blah")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
object SparkSubmitSuite extends SparkFunSuite with TimeLimits {
|
object SparkSubmitSuite extends SparkFunSuite with TimeLimits {
|
||||||
|
|
|
@ -1205,6 +1205,34 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
|
||||||
assert(Utils.stringHalfWidth("\u0967\u0968\u0969") == 3)
|
assert(Utils.stringHalfWidth("\u0967\u0968\u0969") == 3)
|
||||||
// scalastyle:on nonascii
|
// scalastyle:on nonascii
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test("trimExceptCRLF standalone") {
|
||||||
|
val crlfSet = Set("\r", "\n")
|
||||||
|
val nonPrintableButCRLF = (0 to 32).map(_.toChar.toString).toSet -- crlfSet
|
||||||
|
|
||||||
|
// identity for CRLF
|
||||||
|
crlfSet.foreach { s => Utils.trimExceptCRLF(s) === s }
|
||||||
|
|
||||||
|
// empty for other non-printables
|
||||||
|
nonPrintableButCRLF.foreach { s => assert(Utils.trimExceptCRLF(s) === "") }
|
||||||
|
|
||||||
|
// identity for a printable string
|
||||||
|
assert(Utils.trimExceptCRLF("a") === "a")
|
||||||
|
|
||||||
|
// identity for strings with CRLF
|
||||||
|
crlfSet.foreach { s =>
|
||||||
|
assert(Utils.trimExceptCRLF(s"${s}a") === s"${s}a")
|
||||||
|
assert(Utils.trimExceptCRLF(s"a${s}") === s"a${s}")
|
||||||
|
assert(Utils.trimExceptCRLF(s"b${s}b") === s"b${s}b")
|
||||||
|
}
|
||||||
|
|
||||||
|
// trim nonPrintableButCRLF except when inside a string
|
||||||
|
nonPrintableButCRLF.foreach { s =>
|
||||||
|
assert(Utils.trimExceptCRLF(s"${s}a") === "a")
|
||||||
|
assert(Utils.trimExceptCRLF(s"a${s}") === "a")
|
||||||
|
assert(Utils.trimExceptCRLF(s"b${s}b") === s"b${s}b")
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private class SimpleExtension
|
private class SimpleExtension
|
||||||
|
|
Loading…
Reference in a new issue