c391dc65ef
## What changes were proposed in this pull request? This PR replaces `turing` with `tuning` in files and a file name. Currently, in the left side menu, `Turing` is shown. [This page](https://dist.apache.org/repos/dist/dev/spark/v2.4.0-rc4-docs/_site/sql-performance-turing.html) is one of examples. ![image](https://user-images.githubusercontent.com/1315079/47332714-20a96180-d6bb-11e8-9a5a-0a8dad292626.png) ## How was this patch tested? `grep -rin turing docs` && `find docs -name "*turing*"` Closes #22800 from kiszk/SPARK-24499-follow. Authored-by: Kazuaki Ishizaki <ishizaki@jp.ibm.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
152 lines
4.9 KiB
Markdown
152 lines
4.9 KiB
Markdown
---
|
|
layout: global
|
|
title: Performance Tuning
|
|
displayTitle: Performance Tuning
|
|
---
|
|
|
|
* Table of contents
|
|
{:toc}
|
|
|
|
For some workloads, it is possible to improve performance by either caching data in memory, or by
|
|
turning on some experimental options.
|
|
|
|
## Caching Data In Memory
|
|
|
|
Spark SQL can cache tables using an in-memory columnar format by calling `spark.catalog.cacheTable("tableName")` or `dataFrame.cache()`.
|
|
Then Spark SQL will scan only required columns and will automatically tune compression to minimize
|
|
memory usage and GC pressure. You can call `spark.catalog.uncacheTable("tableName")` to remove the table from memory.
|
|
|
|
Configuration of in-memory caching can be done using the `setConf` method on `SparkSession` or by running
|
|
`SET key=value` commands using SQL.
|
|
|
|
<table class="table">
|
|
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
|
|
<tr>
|
|
<td><code>spark.sql.inMemoryColumnarStorage.compressed</code></td>
|
|
<td>true</td>
|
|
<td>
|
|
When set to true Spark SQL will automatically select a compression codec for each column based
|
|
on statistics of the data.
|
|
</td>
|
|
</tr>
|
|
<tr>
|
|
<td><code>spark.sql.inMemoryColumnarStorage.batchSize</code></td>
|
|
<td>10000</td>
|
|
<td>
|
|
Controls the size of batches for columnar caching. Larger batch sizes can improve memory utilization
|
|
and compression, but risk OOMs when caching data.
|
|
</td>
|
|
</tr>
|
|
|
|
</table>
|
|
|
|
## Other Configuration Options
|
|
|
|
The following options can also be used to tune the performance of query execution. It is possible
|
|
that these options will be deprecated in future release as more optimizations are performed automatically.
|
|
|
|
<table class="table">
|
|
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
|
|
<tr>
|
|
<td><code>spark.sql.files.maxPartitionBytes</code></td>
|
|
<td>134217728 (128 MB)</td>
|
|
<td>
|
|
The maximum number of bytes to pack into a single partition when reading files.
|
|
</td>
|
|
</tr>
|
|
<tr>
|
|
<td><code>spark.sql.files.openCostInBytes</code></td>
|
|
<td>4194304 (4 MB)</td>
|
|
<td>
|
|
The estimated cost to open a file, measured by the number of bytes could be scanned in the same
|
|
time. This is used when putting multiple files into a partition. It is better to over-estimated,
|
|
then the partitions with small files will be faster than partitions with bigger files (which is
|
|
scheduled first).
|
|
</td>
|
|
</tr>
|
|
<tr>
|
|
<td><code>spark.sql.broadcastTimeout</code></td>
|
|
<td>300</td>
|
|
<td>
|
|
<p>
|
|
Timeout in seconds for the broadcast wait time in broadcast joins
|
|
</p>
|
|
</td>
|
|
</tr>
|
|
<tr>
|
|
<td><code>spark.sql.autoBroadcastJoinThreshold</code></td>
|
|
<td>10485760 (10 MB)</td>
|
|
<td>
|
|
Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when
|
|
performing a join. By setting this value to -1 broadcasting can be disabled. Note that currently
|
|
statistics are only supported for Hive Metastore tables where the command
|
|
<code>ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan</code> has been run.
|
|
</td>
|
|
</tr>
|
|
<tr>
|
|
<td><code>spark.sql.shuffle.partitions</code></td>
|
|
<td>200</td>
|
|
<td>
|
|
Configures the number of partitions to use when shuffling data for joins or aggregations.
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
|
|
## Broadcast Hint for SQL Queries
|
|
|
|
The `BROADCAST` hint guides Spark to broadcast each specified table when joining them with another table or view.
|
|
When Spark deciding the join methods, the broadcast hash join (i.e., BHJ) is preferred,
|
|
even if the statistics is above the configuration `spark.sql.autoBroadcastJoinThreshold`.
|
|
When both sides of a join are specified, Spark broadcasts the one having the lower statistics.
|
|
Note Spark does not guarantee BHJ is always chosen, since not all cases (e.g. full outer join)
|
|
support BHJ. When the broadcast nested loop join is selected, we still respect the hint.
|
|
|
|
<div class="codetabs">
|
|
|
|
<div data-lang="scala" markdown="1">
|
|
|
|
{% highlight scala %}
|
|
import org.apache.spark.sql.functions.broadcast
|
|
broadcast(spark.table("src")).join(spark.table("records"), "key").show()
|
|
{% endhighlight %}
|
|
|
|
</div>
|
|
|
|
<div data-lang="java" markdown="1">
|
|
|
|
{% highlight java %}
|
|
import static org.apache.spark.sql.functions.broadcast;
|
|
broadcast(spark.table("src")).join(spark.table("records"), "key").show();
|
|
{% endhighlight %}
|
|
|
|
</div>
|
|
|
|
<div data-lang="python" markdown="1">
|
|
|
|
{% highlight python %}
|
|
from pyspark.sql.functions import broadcast
|
|
broadcast(spark.table("src")).join(spark.table("records"), "key").show()
|
|
{% endhighlight %}
|
|
|
|
</div>
|
|
|
|
<div data-lang="r" markdown="1">
|
|
|
|
{% highlight r %}
|
|
src <- sql("SELECT * FROM src")
|
|
records <- sql("SELECT * FROM records")
|
|
head(join(broadcast(src), records, src$key == records$key))
|
|
{% endhighlight %}
|
|
|
|
</div>
|
|
|
|
<div data-lang="sql" markdown="1">
|
|
|
|
{% highlight sql %}
|
|
-- We accept BROADCAST, BROADCASTJOIN and MAPJOIN for broadcast hint
|
|
SELECT /*+ BROADCAST(r) */ * FROM records r JOIN src s ON r.key = s.key
|
|
{% endhighlight %}
|
|
|
|
</div>
|
|
</div>
|