[SPARK-23029][DOCS] Specifying default units of configuration entries

## What changes were proposed in this pull request?
This PR completes the docs, specifying the default units assumed in configuration entries of type size.
This is crucial since unit-less values are accepted and the user might assume the base unit is bytes, which in most cases it is not, leading to hard-to-debug problems.

## How was this patch tested?
This patch updates only documentation only.

Author: Fernando Pereira <fernando.pereira@epfl.ch>

Closes #20269 from ferdonline/docs_units.
This commit is contained in:
Fernando Pereira 2018-01-18 13:02:03 -06:00 committed by Sean Owen
parent cf7ee1767d
commit 9678941f54
3 changed files with 85 additions and 68 deletions

View file

@ -640,9 +640,9 @@ private[spark] object SparkConf extends Logging {
translation = s => s"${s.toLong * 10}s")),
"spark.reducer.maxSizeInFlight" -> Seq(
AlternateConfig("spark.reducer.maxMbInFlight", "1.4")),
"spark.kryoserializer.buffer" ->
Seq(AlternateConfig("spark.kryoserializer.buffer.mb", "1.4",
translation = s => s"${(s.toDouble * 1000).toInt}k")),
"spark.kryoserializer.buffer" -> Seq(
AlternateConfig("spark.kryoserializer.buffer.mb", "1.4",
translation = s => s"${(s.toDouble * 1000).toInt}k")),
"spark.kryoserializer.buffer.max" -> Seq(
AlternateConfig("spark.kryoserializer.buffer.max.mb", "1.4")),
"spark.shuffle.file.buffer" -> Seq(

View file

@ -38,10 +38,13 @@ package object config {
ConfigBuilder("spark.driver.userClassPathFirst").booleanConf.createWithDefault(false)
private[spark] val DRIVER_MEMORY = ConfigBuilder("spark.driver.memory")
.doc("Amount of memory to use for the driver process, in MiB unless otherwise specified.")
.bytesConf(ByteUnit.MiB)
.createWithDefaultString("1g")
private[spark] val DRIVER_MEMORY_OVERHEAD = ConfigBuilder("spark.driver.memoryOverhead")
.doc("The amount of off-heap memory to be allocated per driver in cluster mode, " +
"in MiB unless otherwise specified.")
.bytesConf(ByteUnit.MiB)
.createOptional
@ -62,6 +65,7 @@ package object config {
.createWithDefault(false)
private[spark] val EVENT_LOG_OUTPUT_BUFFER_SIZE = ConfigBuilder("spark.eventLog.buffer.kb")
.doc("Buffer size to use when writing to output streams, in KiB unless otherwise specified.")
.bytesConf(ByteUnit.KiB)
.createWithDefaultString("100k")
@ -81,10 +85,13 @@ package object config {
ConfigBuilder("spark.executor.userClassPathFirst").booleanConf.createWithDefault(false)
private[spark] val EXECUTOR_MEMORY = ConfigBuilder("spark.executor.memory")
.doc("Amount of memory to use per executor process, in MiB unless otherwise specified.")
.bytesConf(ByteUnit.MiB)
.createWithDefaultString("1g")
private[spark] val EXECUTOR_MEMORY_OVERHEAD = ConfigBuilder("spark.executor.memoryOverhead")
.doc("The amount of off-heap memory to be allocated per executor in cluster mode, " +
"in MiB unless otherwise specified.")
.bytesConf(ByteUnit.MiB)
.createOptional
@ -353,7 +360,7 @@ package object config {
private[spark] val BUFFER_WRITE_CHUNK_SIZE =
ConfigBuilder("spark.buffer.write.chunkSize")
.internal()
.doc("The chunk size during writing out the bytes of ChunkedByteBuffer.")
.doc("The chunk size in bytes during writing out the bytes of ChunkedByteBuffer.")
.bytesConf(ByteUnit.BYTE)
.checkValue(_ <= Int.MaxValue, "The chunk size during writing out the bytes of" +
" ChunkedByteBuffer should not larger than Int.MaxValue.")
@ -368,9 +375,9 @@ package object config {
private[spark] val SHUFFLE_ACCURATE_BLOCK_THRESHOLD =
ConfigBuilder("spark.shuffle.accurateBlockThreshold")
.doc("When we compress the size of shuffle blocks in HighlyCompressedMapStatus, we will " +
"record the size accurately if it's above this config. This helps to prevent OOM by " +
"avoiding underestimating shuffle block size when fetch shuffle blocks.")
.doc("Threshold in bytes above which the size of shuffle blocks in " +
"HighlyCompressedMapStatus is accurately recorded. This helps to prevent OOM " +
"by avoiding underestimating shuffle block size when fetch shuffle blocks.")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(100 * 1024 * 1024)
@ -389,23 +396,23 @@ package object config {
private[spark] val REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS =
ConfigBuilder("spark.reducer.maxBlocksInFlightPerAddress")
.doc("This configuration limits the number of remote blocks being fetched per reduce task" +
" from a given host port. When a large number of blocks are being requested from a given" +
" address in a single fetch or simultaneously, this could crash the serving executor or" +
" Node Manager. This is especially useful to reduce the load on the Node Manager when" +
" external shuffle is enabled. You can mitigate the issue by setting it to a lower value.")
.doc("This configuration limits the number of remote blocks being fetched per reduce task " +
"from a given host port. When a large number of blocks are being requested from a given " +
"address in a single fetch or simultaneously, this could crash the serving executor or " +
"Node Manager. This is especially useful to reduce the load on the Node Manager when " +
"external shuffle is enabled. You can mitigate the issue by setting it to a lower value.")
.intConf
.checkValue(_ > 0, "The max no. of blocks in flight cannot be non-positive.")
.createWithDefault(Int.MaxValue)
private[spark] val MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM =
ConfigBuilder("spark.maxRemoteBlockSizeFetchToMem")
.doc("Remote block will be fetched to disk when size of the block is " +
"above this threshold. This is to avoid a giant request takes too much memory. We can " +
"enable this config by setting a specific value(e.g. 200m). Note this configuration will " +
"affect both shuffle fetch and block manager remote block fetch. For users who " +
"enabled external shuffle service, this feature can only be worked when external shuffle" +
" service is newer than Spark 2.2.")
.doc("Remote block will be fetched to disk when size of the block is above this threshold " +
"in bytes. This is to avoid a giant request takes too much memory. We can enable this " +
"config by setting a specific value(e.g. 200m). Note this configuration will affect " +
"both shuffle fetch and block manager remote block fetch. For users who enabled " +
"external shuffle service, this feature can only be worked when external shuffle" +
"service is newer than Spark 2.2.")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)
@ -419,9 +426,9 @@ package object config {
private[spark] val SHUFFLE_FILE_BUFFER_SIZE =
ConfigBuilder("spark.shuffle.file.buffer")
.doc("Size of the in-memory buffer for each shuffle file output stream. " +
"These buffers reduce the number of disk seeks and system calls made " +
"in creating intermediate shuffle files.")
.doc("Size of the in-memory buffer for each shuffle file output stream, in KiB unless " +
"otherwise specified. These buffers reduce the number of disk seeks and system calls " +
"made in creating intermediate shuffle files.")
.bytesConf(ByteUnit.KiB)
.checkValue(v => v > 0 && v <= Int.MaxValue / 1024,
s"The file buffer size must be greater than 0 and less than ${Int.MaxValue / 1024}.")
@ -430,7 +437,7 @@ package object config {
private[spark] val SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE =
ConfigBuilder("spark.shuffle.unsafe.file.output.buffer")
.doc("The file system for this buffer size after each partition " +
"is written in unsafe shuffle writer.")
"is written in unsafe shuffle writer. In KiB unless otherwise specified.")
.bytesConf(ByteUnit.KiB)
.checkValue(v => v > 0 && v <= Int.MaxValue / 1024,
s"The buffer size must be greater than 0 and less than ${Int.MaxValue / 1024}.")
@ -438,7 +445,7 @@ package object config {
private[spark] val SHUFFLE_DISK_WRITE_BUFFER_SIZE =
ConfigBuilder("spark.shuffle.spill.diskWriteBufferSize")
.doc("The buffer size to use when writing the sorted records to an on-disk file.")
.doc("The buffer size, in bytes, to use when writing the sorted records to an on-disk file.")
.bytesConf(ByteUnit.BYTE)
.checkValue(v => v > 0 && v <= Int.MaxValue,
s"The buffer size must be greater than 0 and less than ${Int.MaxValue}.")

View file

@ -58,6 +58,10 @@ The following format is accepted:
1t or 1tb (tebibytes = 1024 gibibytes)
1p or 1pb (pebibytes = 1024 tebibytes)
While numbers without units are generally interpreted as bytes, a few are interpreted as KiB or MiB.
See documentation of individual configuration properties. Specifying units is desirable where
possible.
## Dynamically Loading Spark Properties
In some cases, you may want to avoid hard-coding certain configurations in a `SparkConf`. For
@ -136,9 +140,9 @@ of the most common options to set are:
<td><code>spark.driver.maxResultSize</code></td>
<td>1g</td>
<td>
Limit of total size of serialized results of all partitions for each Spark action (e.g. collect).
Should be at least 1M, or 0 for unlimited. Jobs will be aborted if the total size
is above this limit.
Limit of total size of serialized results of all partitions for each Spark action (e.g.
collect) in bytes. Should be at least 1M, or 0 for unlimited. Jobs will be aborted if the total
size is above this limit.
Having a high limit may cause out-of-memory errors in driver (depends on spark.driver.memory
and memory overhead of objects in JVM). Setting a proper limit can protect the driver from
out-of-memory errors.
@ -148,10 +152,10 @@ of the most common options to set are:
<td><code>spark.driver.memory</code></td>
<td>1g</td>
<td>
Amount of memory to use for the driver process, i.e. where SparkContext is initialized.
(e.g. <code>1g</code>, <code>2g</code>).
<br /><em>Note:</em> In client mode, this config must not be set through the <code>SparkConf</code>
Amount of memory to use for the driver process, i.e. where SparkContext is initialized, in MiB
unless otherwise specified (e.g. <code>1g</code>, <code>2g</code>).
<br />
<em>Note:</em> In client mode, this config must not be set through the <code>SparkConf</code>
directly in your application, because the driver JVM has already started at that point.
Instead, please set this through the <code>--driver-memory</code> command line option
or in your default properties file.
@ -161,27 +165,28 @@ of the most common options to set are:
<td><code>spark.driver.memoryOverhead</code></td>
<td>driverMemory * 0.10, with minimum of 384 </td>
<td>
The amount of off-heap memory (in megabytes) to be allocated per driver in cluster mode. This is
memory that accounts for things like VM overheads, interned strings, other native overheads, etc.
This tends to grow with the container size (typically 6-10%). This option is currently supported
on YARN and Kubernetes.
The amount of off-heap memory to be allocated per driver in cluster mode, in MiB unless
otherwise specified. This is memory that accounts for things like VM overheads, interned strings,
other native overheads, etc. This tends to grow with the container size (typically 6-10%).
This option is currently supported on YARN and Kubernetes.
</td>
</tr>
<tr>
<td><code>spark.executor.memory</code></td>
<td>1g</td>
<td>
Amount of memory to use per executor process (e.g. <code>2g</code>, <code>8g</code>).
Amount of memory to use per executor process, in MiB unless otherwise specified.
(e.g. <code>2g</code>, <code>8g</code>).
</td>
</tr>
<tr>
<td><code>spark.executor.memoryOverhead</code></td>
<td>executorMemory * 0.10, with minimum of 384 </td>
<td>
The amount of off-heap memory (in megabytes) to be allocated per executor. This is memory that
accounts for things like VM overheads, interned strings, other native overheads, etc. This tends
to grow with the executor size (typically 6-10%). This option is currently supported on YARN and
Kubernetes.
The amount of off-heap memory to be allocated per executor, in MiB unless otherwise specified.
This is memory that accounts for things like VM overheads, interned strings, other native
overheads, etc. This tends to grow with the executor size (typically 6-10%).
This option is currently supported on YARN and Kubernetes.
</td>
</tr>
<tr>
@ -431,8 +436,9 @@ Apart from these, the following properties are also available, and may be useful
<td>512m</td>
<td>
Amount of memory to use per python worker process during aggregation, in the same
format as JVM memory strings (e.g. <code>512m</code>, <code>2g</code>). If the memory
used during aggregation goes above this amount, it will spill the data into disks.
format as JVM memory strings with a size unit suffix ("k", "m", "g" or "t")
(e.g. <code>512m</code>, <code>2g</code>).
If the memory used during aggregation goes above this amount, it will spill the data into disks.
</td>
</tr>
<tr>
@ -540,9 +546,10 @@ Apart from these, the following properties are also available, and may be useful
<td><code>spark.reducer.maxSizeInFlight</code></td>
<td>48m</td>
<td>
Maximum size of map outputs to fetch simultaneously from each reduce task. Since
each output requires us to create a buffer to receive it, this represents a fixed memory
overhead per reduce task, so keep it small unless you have a large amount of memory.
Maximum size of map outputs to fetch simultaneously from each reduce task, in MiB unless
otherwise specified. Since each output requires us to create a buffer to receive it, this
represents a fixed memory overhead per reduce task, so keep it small unless you have a
large amount of memory.
</td>
</tr>
<tr>
@ -570,7 +577,7 @@ Apart from these, the following properties are also available, and may be useful
<td><code>spark.maxRemoteBlockSizeFetchToMem</code></td>
<td>Long.MaxValue</td>
<td>
The remote block will be fetched to disk when size of the block is above this threshold.
The remote block will be fetched to disk when size of the block is above this threshold in bytes.
This is to avoid a giant request takes too much memory. We can enable this config by setting
a specific value(e.g. 200m). Note this configuration will affect both shuffle fetch
and block manager remote block fetch. For users who enabled external shuffle service,
@ -589,8 +596,9 @@ Apart from these, the following properties are also available, and may be useful
<td><code>spark.shuffle.file.buffer</code></td>
<td>32k</td>
<td>
Size of the in-memory buffer for each shuffle file output stream. These buffers
reduce the number of disk seeks and system calls made in creating intermediate shuffle files.
Size of the in-memory buffer for each shuffle file output stream, in KiB unless otherwise
specified. These buffers reduce the number of disk seeks and system calls made in creating
intermediate shuffle files.
</td>
</tr>
<tr>
@ -651,7 +659,7 @@ Apart from these, the following properties are also available, and may be useful
<td><code>spark.shuffle.service.index.cache.size</code></td>
<td>100m</td>
<td>
Cache entries limited to the specified memory footprint.
Cache entries limited to the specified memory footprint in bytes.
</td>
</tr>
<tr>
@ -685,9 +693,9 @@ Apart from these, the following properties are also available, and may be useful
<td><code>spark.shuffle.accurateBlockThreshold</code></td>
<td>100 * 1024 * 1024</td>
<td>
When we compress the size of shuffle blocks in HighlyCompressedMapStatus, we will record the
size accurately if it's above this config. This helps to prevent OOM by avoiding
underestimating shuffle block size when fetch shuffle blocks.
Threshold in bytes above which the size of shuffle blocks in HighlyCompressedMapStatus is
accurately recorded. This helps to prevent OOM by avoiding underestimating shuffle
block size when fetch shuffle blocks.
</td>
</tr>
<tr>
@ -779,7 +787,7 @@ Apart from these, the following properties are also available, and may be useful
<td><code>spark.eventLog.buffer.kb</code></td>
<td>100k</td>
<td>
Buffer size in KB to use when writing to output streams.
Buffer size to use when writing to output streams, in KiB unless otherwise specified.
</td>
</tr>
<tr>
@ -917,7 +925,7 @@ Apart from these, the following properties are also available, and may be useful
<td><code>spark.io.compression.lz4.blockSize</code></td>
<td>32k</td>
<td>
Block size used in LZ4 compression, in the case when LZ4 compression codec
Block size in bytes used in LZ4 compression, in the case when LZ4 compression codec
is used. Lowering this block size will also lower shuffle memory usage when LZ4 is used.
</td>
</tr>
@ -925,7 +933,7 @@ Apart from these, the following properties are also available, and may be useful
<td><code>spark.io.compression.snappy.blockSize</code></td>
<td>32k</td>
<td>
Block size used in Snappy compression, in the case when Snappy compression codec
Block size in bytes used in Snappy compression, in the case when Snappy compression codec
is used. Lowering this block size will also lower shuffle memory usage when Snappy is used.
</td>
</tr>
@ -941,7 +949,7 @@ Apart from these, the following properties are also available, and may be useful
<td><code>spark.io.compression.zstd.bufferSize</code></td>
<td>32k</td>
<td>
Buffer size used in Zstd compression, in the case when Zstd compression codec
Buffer size in bytes used in Zstd compression, in the case when Zstd compression codec
is used. Lowering this size will lower the shuffle memory usage when Zstd is used, but it
might increase the compression cost because of excessive JNI call overhead.
</td>
@ -1001,8 +1009,8 @@ Apart from these, the following properties are also available, and may be useful
<td><code>spark.kryoserializer.buffer.max</code></td>
<td>64m</td>
<td>
Maximum allowable size of Kryo serialization buffer. This must be larger than any
object you attempt to serialize and must be less than 2048m.
Maximum allowable size of Kryo serialization buffer, in MiB unless otherwise specified.
This must be larger than any object you attempt to serialize and must be less than 2048m.
Increase this if you get a "buffer limit exceeded" exception inside Kryo.
</td>
</tr>
@ -1010,9 +1018,9 @@ Apart from these, the following properties are also available, and may be useful
<td><code>spark.kryoserializer.buffer</code></td>
<td>64k</td>
<td>
Initial size of Kryo's serialization buffer. Note that there will be one buffer
<i>per core</i> on each worker. This buffer will grow up to
<code>spark.kryoserializer.buffer.max</code> if needed.
Initial size of Kryo's serialization buffer, in KiB unless otherwise specified.
Note that there will be one buffer <i>per core</i> on each worker. This buffer will grow up to
<code>spark.kryoserializer.buffer.max</code> if needed.
</td>
</tr>
<tr>
@ -1086,7 +1094,8 @@ Apart from these, the following properties are also available, and may be useful
<td><code>spark.memory.offHeap.enabled</code></td>
<td>false</td>
<td>
If true, Spark will attempt to use off-heap memory for certain operations. If off-heap memory use is enabled, then <code>spark.memory.offHeap.size</code> must be positive.
If true, Spark will attempt to use off-heap memory for certain operations. If off-heap memory
use is enabled, then <code>spark.memory.offHeap.size</code> must be positive.
</td>
</tr>
<tr>
@ -1094,7 +1103,8 @@ Apart from these, the following properties are also available, and may be useful
<td>0</td>
<td>
The absolute amount of memory in bytes which can be used for off-heap allocation.
This setting has no impact on heap memory usage, so if your executors' total memory consumption must fit within some hard limit then be sure to shrink your JVM heap size accordingly.
This setting has no impact on heap memory usage, so if your executors' total memory consumption
must fit within some hard limit then be sure to shrink your JVM heap size accordingly.
This must be set to a positive value when <code>spark.memory.offHeap.enabled=true</code>.
</td>
</tr>
@ -1202,9 +1212,9 @@ Apart from these, the following properties are also available, and may be useful
<td><code>spark.broadcast.blockSize</code></td>
<td>4m</td>
<td>
Size of each piece of a block for <code>TorrentBroadcastFactory</code>.
Too large a value decreases parallelism during broadcast (makes it slower); however, if it is
too small, <code>BlockManager</code> might take a performance hit.
Size of each piece of a block for <code>TorrentBroadcastFactory</code>, in KiB unless otherwise
specified. Too large a value decreases parallelism during broadcast (makes it slower); however,
if it is too small, <code>BlockManager</code> might take a performance hit.
</td>
</tr>
<tr>
@ -1312,7 +1322,7 @@ Apart from these, the following properties are also available, and may be useful
<td><code>spark.storage.memoryMapThreshold</code></td>
<td>2m</td>
<td>
Size of a block above which Spark memory maps when reading a block from disk.
Size in bytes of a block above which Spark memory maps when reading a block from disk.
This prevents Spark from memory mapping very small blocks. In general, memory
mapping has high overhead for blocks close to or below the page size of the operating system.
</td>