[SPARK-32221][K8S] Avoid possible errors due to incorrect file size or type supplied in spark conf

### What changes were proposed in this pull request?

Skip files if they are binary or very large to fit the configMap's max size.

### Why are the changes needed?

Config map cannot hold binary files and there is also a limit on how much data a configMap can hold.
This limit can be configured by the k8s cluster admin. This PR, skips such files (with a warning) instead of failing with weird runtime errors.
If such files are not skipped, then it would result in mount errors or encoding errors (if binary files are submitted).

### Does this PR introduce _any_ user-facing change?

yes, in simple words avoids possible errors due to negligence (for example, placing a large file or a binary file in SPARK_CONF_DIR) and thus improves user experience.

### How was this patch tested?

Added relevant tests and improved existing tests.

Closes #30472 from ScrapCodes/SPARK-32221/avoid-conf-propagate-errors.

Lead-authored-by: Prashant Sharma <prashsh1@in.ibm.com>
Co-authored-by: Prashant Sharma <prashant@apache.org>
Signed-off-by: Prashant Sharma <prashsh1@in.ibm.com>
This commit is contained in:
Prashant Sharma 2021-01-06 14:55:40 +05:30
parent 26d8df300a
commit f64dfa8727
4 changed files with 164 additions and 24 deletions

View file

@ -99,6 +99,14 @@ private[spark] object Config extends Logging {
.toSequence
.createWithDefault(Nil)
val CONFIG_MAP_MAXSIZE =
ConfigBuilder("spark.kubernetes.configMap.maxSize")
.doc("Max size limit for a config map. This is configurable as per" +
" https://etcd.io/docs/v3.4.0/dev-guide/limit/ on k8s server end.")
.version("3.1.0")
.longConf
.createWithDefault(1572864) // 1.5 MiB
val KUBERNETES_AUTH_DRIVER_CONF_PREFIX = "spark.kubernetes.authenticate.driver"
val KUBERNETES_AUTH_EXECUTOR_CONF_PREFIX = "spark.kubernetes.authenticate.executor"
val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX = "spark.kubernetes.authenticate.driver.mounted"

View file

@ -18,15 +18,17 @@
package org.apache.spark.deploy.k8s.submit
import java.io.{File, StringWriter}
import java.nio.charset.MalformedInputException
import java.util.Properties
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.io.{Codec, Source}
import io.fabric8.kubernetes.api.model.{ConfigMap, ConfigMapBuilder, KeyToPath}
import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.{Constants, KubernetesUtils}
import org.apache.spark.deploy.k8s.{Config, Constants, KubernetesUtils}
import org.apache.spark.deploy.k8s.Constants.ENV_SPARK_CONF_DIR
import org.apache.spark.internal.Logging
@ -54,8 +56,10 @@ private[spark] object KubernetesClientUtils extends Logging {
/**
* Build, file -> 'file's content' map of all the selected files in SPARK_CONF_DIR.
*/
def buildSparkConfDirFilesMap(configMapName: String,
sparkConf: SparkConf, resolvedPropertiesMap: Map[String, String]): Map[String, String] = {
def buildSparkConfDirFilesMap(
configMapName: String,
sparkConf: SparkConf,
resolvedPropertiesMap: Map[String, String]): Map[String, String] = synchronized {
val loadedConfFilesMap = KubernetesClientUtils.loadSparkConfDirFiles(sparkConf)
// Add resolved spark conf to the loaded configuration files map.
if (resolvedPropertiesMap.nonEmpty) {
@ -90,29 +94,71 @@ private[spark] object KubernetesClientUtils extends Logging {
.build()
}
private def loadSparkConfDirFiles(conf: SparkConf): Map[String, String] = {
private def orderFilesBySize(confFiles: Seq[File]): Seq[File] = {
val fileToFileSizePairs = confFiles.map(f => (f, f.getName.length + f.length()))
// sort first by name and then by length, so that during tests we have consistent results.
fileToFileSizePairs.sortBy(f => f._1).sortBy(f => f._2).map(_._1)
}
// exposed for testing
private[submit] def loadSparkConfDirFiles(conf: SparkConf): Map[String, String] = {
val confDir = Option(conf.getenv(ENV_SPARK_CONF_DIR)).orElse(
conf.getOption("spark.home").map(dir => s"$dir/conf"))
val maxSize = conf.get(Config.CONFIG_MAP_MAXSIZE)
if (confDir.isDefined) {
val confFiles = listConfFiles(confDir.get)
logInfo(s"Spark configuration files loaded from $confDir : ${confFiles.mkString(",")}")
confFiles.map { file =>
val source = Source.fromFile(file)(Codec.UTF8)
val mapping = (file.getName -> source.mkString)
source.close()
mapping
}.toMap
val confFiles: Seq[File] = listConfFiles(confDir.get, maxSize)
val orderedConfFiles = orderFilesBySize(confFiles)
var truncatedMapSize: Long = 0
val truncatedMap = mutable.HashMap[String, String]()
val skippedFiles = mutable.HashSet[String]()
var source: Source = Source.fromString("") // init with empty source.
for (file <- orderedConfFiles) {
try {
source = Source.fromFile(file)(Codec.UTF8)
val (fileName, fileContent) = file.getName -> source.mkString
if ((truncatedMapSize + fileName.length + fileContent.length) < maxSize) {
truncatedMap.put(fileName, fileContent)
truncatedMapSize = truncatedMapSize + (fileName.length + fileContent.length)
} else {
skippedFiles.add(fileName)
}
} catch {
case e: MalformedInputException =>
logWarning(
s"Unable to read a non UTF-8 encoded file ${file.getAbsolutePath}. Skipping...", e)
None
} finally {
source.close()
}
}
if (truncatedMap.nonEmpty) {
logInfo(s"Spark configuration files loaded from $confDir :" +
s" ${truncatedMap.keys.mkString(",")}")
}
if (skippedFiles.nonEmpty) {
logWarning(s"Skipped conf file(s) ${skippedFiles.mkString(",")}, due to size constraint." +
s" Please see, config: `${Config.CONFIG_MAP_MAXSIZE.key}` for more details.")
}
truncatedMap.toMap
} else {
Map.empty[String, String]
}
}
private def listConfFiles(confDir: String): Seq[File] = {
// We exclude all the template files and user provided spark conf or properties.
// As spark properties are resolved in a different step.
private def listConfFiles(confDir: String, maxSize: Long): Seq[File] = {
// At the moment configmaps do not support storing binary content (i.e. skip jar,tar,gzip,zip),
// and configMaps do not allow for size greater than 1.5 MiB(configurable).
// https://etcd.io/docs/v3.4.0/dev-guide/limit/
def testIfTooLargeOrBinary(f: File): Boolean = (f.length() + f.getName.length > maxSize) ||
f.getName.matches(".*\\.(gz|zip|jar|tar)")
// We exclude all the template files and user provided spark conf or properties,
// Spark properties are resolved in a different step.
def testIfSparkConfOrTemplates(f: File) = f.getName.matches(".*\\.template") ||
f.getName.matches("spark.*(conf|properties)")
val fileFilter = (f: File) => {
f.isFile && !(f.getName.endsWith("template") ||
f.getName.matches("spark.*(conf|properties)"))
f.isFile && !testIfTooLargeOrBinary(f) && !testIfSparkConfOrTemplates(f)
}
val confFiles: Seq[File] = {
val dir = new File(confDir)

View file

@ -191,25 +191,32 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
assert(configMap.getData.get(SPARK_CONF_FILE_NAME).contains("conf2key=conf2value"))
}
test("All files from SPARK_CONF_DIR, except templates and spark config " +
test("All files from SPARK_CONF_DIR, " +
"except templates, spark config, binary files and are within size limit, " +
"should be populated to pod's configMap.") {
def testSetup: (SparkConf, Seq[String]) = {
val tempDir = Utils.createTempDir()
val sparkConf = new SparkConf(loadDefaults = false).setSparkHome(tempDir.getAbsolutePath)
val sparkConf = new SparkConf(loadDefaults = false)
.setSparkHome(tempDir.getAbsolutePath)
val tempConfDir = new File(s"${tempDir.getAbsolutePath}/conf")
tempConfDir.mkdir()
// File names - which should not get mounted on the resultant config map.
val filteredConfFileNames =
Set("spark-env.sh.template", "spark.properties", "spark-defaults.conf")
val confFileNames = for (i <- 1 to 5) yield s"testConf.$i" ++
Set("spark-env.sh.template", "spark.properties", "spark-defaults.conf",
"test.gz", "test2.jar", "non_utf8.txt")
val confFileNames = (for (i <- 1 to 5) yield s"testConf.$i") ++
List("spark-env.sh") ++ filteredConfFileNames
val testConfFiles = for (i <- confFileNames) yield {
val testConfFiles = (for (i <- confFileNames) yield {
val file = new File(s"${tempConfDir.getAbsolutePath}/$i")
Files.write(file.toPath, "conf1key=conf1value".getBytes(StandardCharsets.UTF_8))
if (i.startsWith("non_utf8")) { // filling some non-utf-8 binary
Files.write(file.toPath, Array[Byte](0x00.toByte, 0xA1.toByte))
} else {
Files.write(file.toPath, "conf1key=conf1value".getBytes(StandardCharsets.UTF_8))
}
file.getName
}
})
assert(tempConfDir.listFiles().length == confFileNames.length)
val expectedConfFiles: Seq[String] = testConfFiles.filterNot(filteredConfFileNames.contains)
(sparkConf, expectedConfFiles)

View file

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.submit
import java.io.File
import java.nio.charset.StandardCharsets
import java.nio.file.Files
import org.scalatest.BeforeAndAfter
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.Config
import org.apache.spark.util.Utils
class KubernetesClientUtilsSuite extends SparkFunSuite with BeforeAndAfter {
def testSetup(inputFiles: Map[String, Array[Byte]]): SparkConf = {
val tempDir = Utils.createTempDir()
val sparkConf = new SparkConf(loadDefaults = false)
.setSparkHome(tempDir.getAbsolutePath)
val tempConfDir = new File(s"${tempDir.getAbsolutePath}/conf")
tempConfDir.mkdir()
for (i <- inputFiles) yield {
val file = new File(s"${tempConfDir.getAbsolutePath}/${i._1}")
Files.write(file.toPath, i._2)
file.getName
}
sparkConf
}
test("verify load files, loads only allowed files and not the disallowed files.") {
val input: Map[String, Array[Byte]] = Map("test.txt" -> "test123", "z12.zip" -> "zZ",
"rere.jar" -> "@31", "spark.jar" -> "@31", "_test" -> "", "sample.conf" -> "conf")
.map(f => f._1 -> f._2.getBytes(StandardCharsets.UTF_8)) ++
Map("binary-file.conf" -> Array[Byte](0x00.toByte, 0xA1.toByte))
val sparkConf = testSetup(input)
val output = KubernetesClientUtils.loadSparkConfDirFiles(sparkConf)
val expectedOutput = Map("test.txt" -> "test123", "sample.conf" -> "conf", "_test" -> "")
assert(output === expectedOutput)
}
test("verify load files, truncates the content to maxSize, when keys are very large in number.") {
val input = (for (i <- 10000 to 1 by -1) yield (s"testConf.${i}" -> "test123456")).toMap
val sparkConf = testSetup(input.map(f => f._1 -> f._2.getBytes(StandardCharsets.UTF_8)))
.set(Config.CONFIG_MAP_MAXSIZE.key, "60")
val output = KubernetesClientUtils.loadSparkConfDirFiles(sparkConf)
val expectedOutput = Map("testConf.1" -> "test123456", "testConf.2" -> "test123456")
assert(output === expectedOutput)
val output1 = KubernetesClientUtils.loadSparkConfDirFiles(
sparkConf.set(Config.CONFIG_MAP_MAXSIZE.key, "250000"))
assert(output1 === input)
}
test("verify load files, truncates the content to maxSize, when keys are equal in length.") {
val input = (for (i <- 9 to 1 by -1) yield (s"testConf.${i}" -> "test123456")).toMap
val sparkConf = testSetup(input.map(f => f._1 -> f._2.getBytes(StandardCharsets.UTF_8)))
.set(Config.CONFIG_MAP_MAXSIZE.key, "80")
val output = KubernetesClientUtils.loadSparkConfDirFiles(sparkConf)
val expectedOutput = Map("testConf.1" -> "test123456", "testConf.2" -> "test123456",
"testConf.3" -> "test123456")
assert(output === expectedOutput)
}
}