[SPARK-19693][SQL] Make the SET mapreduce.job.reduces automatically converted to spark.sql.shuffle.partitions

## What changes were proposed in this pull request?
Make the `SET mapreduce.job.reduces` automatically converted to `spark.sql.shuffle.partitions`, it's similar to `SET mapred.reduce.tasks`.

## How was this patch tested?

unit tests

Author: Yuming Wang <wgyumg@gmail.com>

Closes #17020 from wangyum/SPARK-19693.
This commit is contained in:
Yuming Wang 2017-03-08 11:31:01 +00:00 committed by Sean Owen
parent 81303f7ca7
commit 3f9f9180c2
3 changed files with 33 additions and 0 deletions

View file

@ -60,6 +60,23 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
}
(keyValueOutput, runFunc)
case Some((SQLConf.Replaced.MAPREDUCE_JOB_REDUCES, Some(value))) =>
val runFunc = (sparkSession: SparkSession) => {
logWarning(
s"Property ${SQLConf.Replaced.MAPREDUCE_JOB_REDUCES} is Hadoop's property, " +
s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS.key} instead.")
if (value.toInt < 1) {
val msg =
s"Setting negative ${SQLConf.Replaced.MAPREDUCE_JOB_REDUCES} for automatically " +
"determining the number of reducers is not supported."
throw new IllegalArgumentException(msg)
} else {
sparkSession.conf.set(SQLConf.SHUFFLE_PARTITIONS.key, value)
Seq(Row(SQLConf.SHUFFLE_PARTITIONS.key, value))
}
}
(keyValueOutput, runFunc)
case Some((key @ SetCommand.VariableName(name), Some(value))) =>
val runFunc = (sparkSession: SparkSession) => {
sparkSession.conf.set(name, value)

View file

@ -677,6 +677,10 @@ object SQLConf {
object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
}
object Replaced {
val MAPREDUCE_JOB_REDUCES = "mapreduce.job.reduces"
}
}
/**

View file

@ -1019,6 +1019,18 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
spark.sessionState.conf.clear()
}
test("SET mapreduce.job.reduces automatically converted to spark.sql.shuffle.partitions") {
spark.sessionState.conf.clear()
val before = spark.conf.get(SQLConf.SHUFFLE_PARTITIONS.key).toInt
val newConf = before + 1
sql(s"SET mapreduce.job.reduces=${newConf.toString}")
val after = spark.conf.get(SQLConf.SHUFFLE_PARTITIONS.key).toInt
assert(before != after)
assert(newConf === after)
intercept[IllegalArgumentException](sql(s"SET mapreduce.job.reduces=-1"))
spark.sessionState.conf.clear()
}
test("apply schema") {
val schema1 = StructType(
StructField("f1", IntegerType, false) ::