[SPARK-33806][SQL] limit partition num to 1 when distributing by foldable expressions

### What changes were proposed in this pull request?

It seems a very popular way that people use DISTRIBUTE BY clause with a literal to coalesce partition in the pure SQL data processing.

For example
```
insert into table src select * from values (1), (2), (3) t(a) distribute by 1
```

Users may want the final output to be one single data file, but if the reality is not always true. Spark will always create a file for partition 0 whether it contains data or not, so when the data all goes to a partition(IDX >0), there will be always 2 files there and the part-00000 is empty. On the other hand, a lot of empty tasks will be launched too, this is unnecessary.

When users repeat the insert statement daily, hourly, or minutely, it causes small file issues.

```
spark-sql> set spark.sql.shuffle.partitions=3;drop table if exists test2;create table test2 using parquet as select * from values (1), (2), (3) t(a) distribute by 1;

 kentyaohulk  ~/spark   SPARK-33806  tree /Users/kentyao/Downloads/spark/spark-3.1.0-SNAPSHOT-bin-20201202/spark-warehouse/test2/ -s
/Users/kentyao/Downloads/spark/spark-3.1.0-SNAPSHOT-bin-20201202/spark-warehouse/test2/
├── [          0]  _SUCCESS
├── [        298]  part-00000-5dc19733-9405-414b-9681-d25c4d3e9ee6-c000.snappy.parquet
└── [        426]  part-00001-5dc19733-9405-414b-9681-d25c4d3e9ee6-c000.snappy.parquet
```

To avoid this, there are some options you can take.

1. use `distribute by null`, let the data go to the partition 0
2. set spark.sql.adaptive.enabled to true for Spark to automatically coalesce
3. using hints instead of `distribute by`
4. set spark.sql.shuffle.partitions to 1

In this PR, we set the partition number to 1 in this particular case.

### Why are the changes needed?

1. avoid small file issues
2. avoid unnecessary empty tasks when no adaptive execution

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

new test

Closes #30800 from yaooqinn/SPARK-33806.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
This commit is contained in:
Kent Yao 2020-12-16 14:09:28 -08:00 committed by Dongjoon Hyun
parent 3d0323401f
commit 728a1298af
2 changed files with 24 additions and 2 deletions

View file

@ -1017,7 +1017,16 @@ case class RepartitionByExpression(
child: LogicalPlan,
optNumPartitions: Option[Int]) extends RepartitionOperation {
val numPartitions = optNumPartitions.getOrElse(SQLConf.get.numShufflePartitions)
val numPartitions = if (optNumPartitions.nonEmpty) {
optNumPartitions.get
} else {
if (partitionExpressions.forall(_.foldable)) {
1
} else {
SQLConf.get.numShufflePartitions
}
}
require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
val partitioning: Partitioning = {

View file

@ -27,7 +27,7 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.catalyst.expressions.aggregate.{Complete, Partial}
import org.apache.spark.sql.catalyst.optimizer.{ConvertToLocalRelation, NestedColumnAliasingSuite}
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.catalyst.plans.logical.{Project, RepartitionByExpression}
import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec}
@ -3732,6 +3732,19 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
checkAnswer(sql("SELECT s LIKE 'm@@ca' ESCAPE '@' FROM df"), Row(true))
}
}
test("limit partition num to 1 when distributing by foldable expressions") {
withSQLConf((SQLConf.SHUFFLE_PARTITIONS.key, "5")) {
Seq(1, "1, 2", null, "version()").foreach { expr =>
val plan = sql(s"select * from values (1), (2), (3) t(a) distribute by $expr")
.queryExecution.optimizedPlan
val res = plan.collect {
case r: RepartitionByExpression if r.numPartitions == 1 => true
}
assert(res.nonEmpty)
}
}
}
}
case class Foo(bar: Option[String])