[SPARK-35168][SQL] mapred.reduce.tasks should be shuffle.partitions not adaptive.coalescePartitions.initialPartitionNum

### What changes were proposed in this pull request?

```sql
spark-sql> set spark.sql.adaptive.coalescePartitions.initialPartitionNum=1;
spark.sql.adaptive.coalescePartitions.initialPartitionNum	1
Time taken: 2.18 seconds, Fetched 1 row(s)
spark-sql> set mapred.reduce.tasks;
21/04/21 14:27:11 WARN SetCommand: Property mapred.reduce.tasks is deprecated, showing spark.sql.shuffle.partitions instead.
spark.sql.shuffle.partitions	1
Time taken: 0.03 seconds, Fetched 1 row(s)
spark-sql> set spark.sql.shuffle.partitions;
spark.sql.shuffle.partitions	200
Time taken: 0.024 seconds, Fetched 1 row(s)
spark-sql> set mapred.reduce.tasks=2;
21/04/21 14:31:52 WARN SetCommand: Property mapred.reduce.tasks is deprecated, automatically converted to spark.sql.shuffle.partitions instead.
spark.sql.shuffle.partitions	2
Time taken: 0.017 seconds, Fetched 1 row(s)
spark-sql> set mapred.reduce.tasks;
21/04/21 14:31:55 WARN SetCommand: Property mapred.reduce.tasks is deprecated, showing spark.sql.shuffle.partitions instead.
spark.sql.shuffle.partitions	1
Time taken: 0.017 seconds, Fetched 1 row(s)
spark-sql>
```

`mapred.reduce.tasks` is mapping to `spark.sql.shuffle.partitions` at write-side, but `spark.sql.adaptive.coalescePartitions.initialPartitionNum` might take precede of `spark.sql.shuffle.partitions`

### Why are the changes needed?

roundtrip for `mapred.reduce.tasks`

### Does this PR introduce _any_ user-facing change?

yes, `mapred.reduce.tasks` will always report `spark.sql.shuffle.partitions` whether `spark.sql.adaptive.coalescePartitions.initialPartitionNum` exists or not.

### How was this patch tested?

a new test

Closes #32265 from yaooqinn/SPARK-35168.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Kent Yao <yao@apache.org>
This commit is contained in:
Kent Yao 2021-04-25 20:27:12 +08:00
parent b108e7fff3
commit 5b1353f690
2 changed files with 16 additions and 1 deletions

View file

@ -139,7 +139,7 @@ case class SetCommand(kv: Option[(String, Option[String])])
s"showing ${SQLConf.SHUFFLE_PARTITIONS.key} instead.")
Seq(Row(
SQLConf.SHUFFLE_PARTITIONS.key,
sparkSession.sessionState.conf.numShufflePartitions.toString))
sparkSession.sessionState.conf.defaultNumShufflePartitions.toString))
}
(keyValueOutput, runFunc)

View file

@ -117,6 +117,21 @@ class SQLConfSuite extends QueryTest with SharedSparkSession {
}
}
test(s"SPARK-35168: ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} should respect" +
s" ${SQLConf.SHUFFLE_PARTITIONS.key}") {
spark.sessionState.conf.clear()
try {
sql(s"SET ${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key}=true")
sql(s"SET ${SQLConf.COALESCE_PARTITIONS_ENABLED.key}=true")
sql(s"SET ${SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key}=1")
sql(s"SET ${SQLConf.SHUFFLE_PARTITIONS.key}=2")
checkAnswer(sql(s"SET ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS}"),
Row(SQLConf.SHUFFLE_PARTITIONS.key, "2"))
} finally {
spark.sessionState.conf.clear()
}
}
test("SPARK-31234: reset will not change static sql configs and spark core configs") {
val conf = spark.sparkContext.getConf.getAll.toMap
val appName = conf.get("spark.app.name")