### What changes were proposed in this pull request? Update the Spark SQL document menu and join strategy hints. ### Why are the changes needed? - Several new changes in the Spark SQL document didn't change the menu-sql.yaml correspondingly. - Update the demo code for join strategy hints. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Document change only. Closes #26917 from xuanyuanking/SPARK-30278. Authored-by: Yuanjian Li <xyliyuanjian@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
6.8 KiB
layout | title | displayTitle | license |
---|---|---|---|
global | Performance Tuning | Performance Tuning | Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. |
- Table of contents {:toc}
For some workloads, it is possible to improve performance by either caching data in memory, or by turning on some experimental options.
Caching Data In Memory
Spark SQL can cache tables using an in-memory columnar format by calling spark.catalog.cacheTable("tableName")
or dataFrame.cache()
.
Then Spark SQL will scan only required columns and will automatically tune compression to minimize
memory usage and GC pressure. You can call spark.catalog.uncacheTable("tableName")
to remove the table from memory.
Configuration of in-memory caching can be done using the setConf
method on SparkSession
or by running
SET key=value
commands using SQL.
Property Name | Default | Meaning |
---|---|---|
spark.sql.inMemoryColumnarStorage.compressed |
true | When set to true Spark SQL will automatically select a compression codec for each column based on statistics of the data. |
spark.sql.inMemoryColumnarStorage.batchSize |
10000 | Controls the size of batches for columnar caching. Larger batch sizes can improve memory utilization and compression, but risk OOMs when caching data. |
Other Configuration Options
The following options can also be used to tune the performance of query execution. It is possible that these options will be deprecated in future release as more optimizations are performed automatically.
Property Name | Default | Meaning |
---|---|---|
spark.sql.files.maxPartitionBytes |
134217728 (128 MB) | The maximum number of bytes to pack into a single partition when reading files. |
spark.sql.files.openCostInBytes |
4194304 (4 MB) | The estimated cost to open a file, measured by the number of bytes could be scanned in the same time. This is used when putting multiple files into a partition. It is better to over-estimated, then the partitions with small files will be faster than partitions with bigger files (which is scheduled first). |
spark.sql.broadcastTimeout |
300 |
Timeout in seconds for the broadcast wait time in broadcast joins |
spark.sql.autoBroadcastJoinThreshold |
10485760 (10 MB) |
Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when
performing a join. By setting this value to -1 broadcasting can be disabled. Note that currently
statistics are only supported for Hive Metastore tables where the command
ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan has been run.
|
spark.sql.shuffle.partitions |
200 | Configures the number of partitions to use when shuffling data for joins or aggregations. |
Join Strategy Hints for SQL Queries
The join strategy hints, namely BROADCAST
, MERGE
, SHUFFLE_HASH
and SHUFFLE_REPLICATE_NL
,
instruct Spark to use the hinted strategy on each specified relation when joining them with another
relation. For example, when the BROADCAST
hint is used on table 't1', broadcast join (either
broadcast hash join or broadcast nested loop join depending on whether there is any equi-join key)
with 't1' as the build side will be prioritized by Spark even if the size of table 't1' suggested
by the statistics is above the configuration spark.sql.autoBroadcastJoinThreshold
.
When different join strategy hints are specified on both sides of a join, Spark prioritizes the
BROADCAST
hint over the MERGE
hint over the SHUFFLE_HASH
hint over the SHUFFLE_REPLICATE_NL
hint. When both sides are specified with the BROADCAST
hint or the SHUFFLE_HASH
hint, Spark will
pick the build side based on the join type and the sizes of the relations.
Note that there is no guarantee that Spark will choose the join strategy specified in the hint since a specific strategy may not support all join types.
{% highlight scala %} spark.table("src").join(spark.table("records").hint("broadcast"), "key").show() {% endhighlight %}
{% highlight java %} spark.table("src").join(spark.table("records").hint("broadcast"), "key").show(); {% endhighlight %}
{% highlight python %} spark.table("src").join(spark.table("records").hint("broadcast"), "key").show() {% endhighlight %}
{% highlight r %} src <- sql("SELECT * FROM src") records <- sql("SELECT * FROM records") head(join(src, hint(records, "broadcast"), src$key == records$key)) {% endhighlight %}
{% highlight sql %} -- We accept BROADCAST, BROADCASTJOIN and MAPJOIN for broadcast hint SELECT /*+ BROADCAST(r) */ * FROM records r JOIN src s ON r.key = s.key {% endhighlight %}
Coalesce Hints for SQL Queries
Coalesce hints allows the Spark SQL users to control the number of output files just like the
coalesce
, repartition
and repartitionByRange
in Dataset API, they can be used for performance
tuning and reducing the number of output files. The "COALESCE" hint only has a partition number as a
parameter. The "REPARTITION" hint has a partition number, columns, or both of them as parameters.
The "REPARTITION_BY_RANGE" hint must have column names and a partition number is optional.
SELECT /*+ COALESCE(3) */ * FROM t
SELECT /*+ REPARTITION(3) */ * FROM t
SELECT /*+ REPARTITION(c) */ * FROM t
SELECT /*+ REPARTITION(3, c) */ * FROM t
SELECT /*+ REPARTITION_BY_RANGE(c) */ * FROM t
SELECT /*+ REPARTITION_BY_RANGE(3, c) */ * FROM t