## What changes were proposed in this pull request?
Spark currently uses TimSort for all in-memory sorts, including sorts done for shuffle. One low-hanging fruit is to use radix sort when possible (e.g. sorting by integer keys). This PR adds a radix sort implementation to the unsafe sort package and switches shuffles and sorts to use it when possible.
The current implementation does not have special support for null values, so we cannot radix-sort `LongType`. I will address this in a follow-up PR.
## How was this patch tested?
Unit tests, enabling radix sort on existing tests. Microbenchmark results:
```
Running benchmark: radix sort 25000000
Java HotSpot(TM) 64-Bit Server VM 1.8.0_66-b17 on Linux 3.13.0-44-generic
Intel(R) Core(TM) i7-4600U CPU 2.10GHz
radix sort 25000000: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
reference TimSort key prefix array 15546 / 15859 1.6 621.9 1.0X
reference Arrays.sort 2416 / 2446 10.3 96.6 6.4X
radix sort one byte 133 / 137 188.4 5.3 117.2X
radix sort two bytes 255 / 258 98.2 10.2 61.1X
radix sort eight bytes 991 / 997 25.2 39.6 15.7X
radix sort key prefix array 1540 / 1563 16.2 61.6 10.1X
```
I also ran a mix of the supported TPCDS queries and compared TimSort vs RadixSort metrics. The overall benchmark ran ~10% faster with radix sort on. In the breakdown below, the radix-enabled sort phases averaged about 20x faster than TimSort, however sorting is only a small fraction of the overall runtime. About half of the TPCDS queries were able to take advantage of radix sort.
```
TPCDS on master: 2499s real time, 8185s executor
- 1171s in TimSort, avg 267 MB/s
(note the /s accounting is weird here since dataSize counts the record sizes too)
TPCDS with radix enabled: 2294s real time, 7391s executor
- 596s in TimSort, avg 254 MB/s
- 26s in radix sort, avg 4.2 GB/s
```
cc davies rxin
Author: Eric Liang <ekl@databricks.com>
Closes#12490 from ericl/sort-benchmark.
## What changes were proposed in this pull request?
In general, `onDisconnected` is for dealing with unexpected network disconnections. When RpcEnv.shutdown is called, the disconnections are expected so RpcEnv should not fire these events.
This PR moves `dispatcher.stop()` above closing the connections so that when stopping RpcEnv, the endpoints won't receive `onDisconnected` events.
In addition, Outbox should not close the client since it will be reused by others. This PR fixes it as well.
## How was this patch tested?
test("SPARK-14699: RpcEnv.shutdown should not fire onDisconnected events")
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#12481 from zsxwing/SPARK-14699.
## What changes were proposed in this pull request?
In #9241 It implemented a mechanism to call spill() on those SQL operators that support spilling if there is not enough memory for execution.
But ExternalSorter and AppendOnlyMap in Spark core are not worked. So this PR make them benefit from #9241. Now when there is not enough memory for execution, it can get memory by spilling ExternalSorter and AppendOnlyMap in Spark core.
## How was this patch tested?
add two unit tests for it.
Author: Lianhui Wang <lianhuiwang09@gmail.com>
Closes#10024 from lianhuiwang/SPARK-4452-2.
## What changes were proposed in this pull request?
the `Accumulable.internal` flag is only used to avoid registering internal accumulators for 2 certain cases:
1. `TaskMetrics.createTempShuffleReadMetrics`: the accumulators in the temp shuffle read metrics should not be registered.
2. `TaskMetrics.fromAccumulatorUpdates`: the created task metrics is only used to post event, accumulators inside it should not be registered.
For 1, we can create a `TempShuffleReadMetrics` that don't create accumulators, just keep the data and merge it at last.
For 2, we can un-register these accumulators immediately.
TODO: remove `internal` flag in `AccumulableInfo` with followup PR
## How was this patch tested?
existing tests.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#12525 from cloud-fan/acc.
## What changes were proposed in this pull request?
The DAG visualization can cause an OOM when generating the DOT file.
This happens because clusters are not correctly deduped by a contains
check because they use the default equals implementation. This adds a
working equals implementation.
## How was this patch tested?
This adds a test suite that checks the new equals implementation.
Author: Ryan Blue <blue@apache.org>
Closes#12437 from rdblue/SPARK-14679-fix-ui-oom.
## What changes were proposed in this pull request?
Before this PR, we create accumulators at driver side(and register them) and send them to executor side, then we create `TaskMetrics` with these accumulators at executor side.
After this PR, we will create `TaskMetrics` at driver side and send it to executor side, so that we can create accumulators inside `TaskMetrics` directly, which is cleaner.
## How was this patch tested?
existing tests.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#12472 from cloud-fan/acc.
## What changes were proposed in this pull request?
This PR adds support for specifying an optional custom coalescer to the `coalesce()` method. Currently I have only added this feature to the `RDD` interface, and once we sort out the details we can proceed with adding this feature to the other APIs (`Dataset` etc.)
## How was this patch tested?
Added a unit test for this functionality.
/cc rxin (per our discussion on the mailing list)
Author: Nezih Yigitbasi <nyigitbasi@netflix.com>
Closes#11865 from nezihyigitbasi/custom_coalesce_policy.
When `Await.result` throws an exception which originated from a different thread, the resulting stacktrace doesn't include the path leading to the `Await.result` call itself, making it difficult to identify the impact of these exceptions. For example, I've seen cases where broadcast cleaning errors propagate to the main thread and crash it but the resulting stacktrace doesn't include any of the main thread's code, making it difficult to pinpoint which exception crashed that thread.
This patch addresses this issue by explicitly catching, wrapping, and re-throwing exceptions that are thrown by `Await.result`.
I tested this manually using 16b31c8251, a patch which reproduces an issue where an RPC exception which occurs while unpersisting RDDs manages to crash the main thread without any useful stacktrace, and verified that informative, full stacktraces were generated after applying the fix in this PR.
/cc rxin nongli yhuai anabranch
Author: Josh Rosen <joshrosen@databricks.com>
Closes#12433 from JoshRosen/wrap-and-rethrow-await-exceptions.
## What changes were proposed in this pull request?
The sort shuffle manager has been the default since Spark 1.2. It is time to remove the old hash shuffle manager.
## How was this patch tested?
Removed some tests related to the old manager.
Author: Reynold Xin <rxin@databricks.com>
Closes#12423 from rxin/SPARK-14667.
## What changes were proposed in this pull request?
This PR is a follow up for https://github.com/apache/spark/pull/12417, now we always track input/output/shuffle metrics in spark JSON protocol and status API.
Most of the line changes are because of re-generating the gold answer for `HistoryServerSuite`, and we add a lot of 0 values for read/write metrics.
## How was this patch tested?
existing tests.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#12462 from cloud-fan/follow.
## What changes were proposed in this pull request?
When there are multiple tests running, "NettyBlockTransferServiceSuite.can bind to a specific port twice and the second increments" may fail.
E.g., assume there are 2 tests running. Here are the execution order to reproduce the test failure.
| Execution Order | Test 1 | Test 2 |
| ------------- | ------------- | ------------- |
| 1 | service0 binds to 17634 | |
| 2 | | service0 binds to 17635 (17634 is occupied) |
| 3 | service1 binds to 17636 | |
| 4 | pass test | |
| 5 | service0.close (release 17634) | |
| 6 | | service1 binds to 17634 |
| 7 | | `service1.port should be (service0.port + 1)` fails (17634 != 17635 + 1) |
Here is an example in Jenkins: https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test/job/spark-master-test-maven-hadoop-2.2/786/testReport/junit/org.apache.spark.network.netty/NettyBlockTransferServiceSuite/can_bind_to_a_specific_port_twice_and_the_second_increments/
This PR makes two changes:
- Use a random port between 17634 and 27634 to reduce the possibility of port conflicts.
- Make `service1` use `service0.port` to bind to avoid the above race condition.
## How was this patch tested?
Jenkins unit tests.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#12477 from zsxwing/SPARK-14713.
## What changes were proposed in this pull request?
This commit adds support for pluggable cluster manager. And also allows a cluster manager to clean up tasks without taking the parent process down.
To plug a new external cluster manager, ExternalClusterManager trait should be implemented. It returns task scheduler and backend scheduler that will be used by SparkContext to schedule tasks. An external cluster manager is registered using the java.util.ServiceLoader mechanism (This mechanism is also being used to register data sources like parquet, json, jdbc etc.). This allows auto-loading implementations of ExternalClusterManager interface.
Currently, when a driver fails, executors exit using system.exit. This does not bode well for cluster managers that would like to reuse the parent process of an executor. Hence,
1. Moving system.exit to a function that can be overriden in subclasses of CoarseGrainedExecutorBackend.
2. Added functionality of killing all the running tasks in an executor.
## How was this patch tested?
ExternalClusterManagerSuite.scala was added to test this patch.
Author: Hemant Bhanawat <hemant@snappydata.io>
Closes#11723 from hbhanawat/pluggableScheduler.
## What changes were proposed in this pull request?
Part of the reason why TaskMetrics and its callers are complicated are due to the optional metrics we collect, including input, output, shuffle read, and shuffle write. I think we can always track them and just assign 0 as the initial values. It is usually very obvious whether a task is supposed to read any data or not. By always tracking them, we can remove a lot of map, foreach, flatMap, getOrElse(0L) calls throughout Spark.
This patch also changes a few behaviors.
1. Removed the distinction of data read/write methods (e.g. Hadoop, Memory, Network, etc).
2. Accumulate all data reads and writes, rather than only the first method. (Fixes SPARK-5225)
## How was this patch tested?
existing tests.
This is bases on https://github.com/apache/spark/pull/12388, with more test fixes.
Author: Reynold Xin <rxin@databricks.com>
Author: Wenchen Fan <wenchen@databricks.com>
Closes#12417 from cloud-fan/metrics-refactor.
## What changes were proposed in this pull request?
This patch removes some of the deprecated APIs in TaskMetrics. This is part of my bigger effort to simplify accumulators and task metrics.
## How was this patch tested?
N/A - only removals
Author: Reynold Xin <rxin@databricks.com>
Closes#12375 from rxin/SPARK-14617.
## What changes were proposed in this pull request?
When there are multiple attempts for a stage, we currently only reset internal accumulator values if all the tasks are resubmitted. It would make more sense to reset the accumulator values for each stage attempt. This will allow us to eventually get rid of the internal flag in the Accumulator class. This is part of my bigger effort to simplify accumulators and task metrics.
## How was this patch tested?
Covered by existing tests.
Author: Reynold Xin <rxin@databricks.com>
Closes#12378 from rxin/SPARK-14619.
## What changes were proposed in this pull request?
I was trying to understand the accumulator and metrics update source code and these two classes don't really need to be case classes. It would also be more consistent with other UI classes if they are not case classes. This is part of my bigger effort to simplify accumulators and task metrics.
## How was this patch tested?
This is a straightforward refactoring without behavior change.
Author: Reynold Xin <rxin@databricks.com>
Closes#12386 from rxin/SPARK-14625.
## What changes were proposed in this pull request?
According to the [Spark Code Style Guide](https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide) and [Scala Style Guide](http://docs.scala-lang.org/style/control-structures.html#curlybraces), we had better enforce the following rule.
```
case: Always omit braces in case clauses.
```
This PR makes a new ScalaStyle rule, 'OmitBracesInCase', and enforces it to the code.
## How was this patch tested?
Pass the Jenkins tests (including Scala style checking)
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#12280 from dongjoon-hyun/SPARK-14508.
## What changes were proposed in this pull request?
This adds a new API call `TaskContext.getLocalProperty` for getting properties set in the driver from executors. These local properties are automatically propagated from the driver to executors. For streaming, the context for streaming tasks will be the initial driver context when ssc.start() is called.
## How was this patch tested?
Unit tests.
cc JoshRosen
Author: Eric Liang <ekl@databricks.com>
Closes#12248 from ericl/sc-2813.
## What changes were proposed in this pull request?
When deciding whether a CommitDeniedException caused a task to fail, consider the root cause of the Exception.
## How was this patch tested?
Added a test suite for the component that extracts the root cause of the error.
Made a distribution after cherry-picking this commit to branch-1.6 and used to run our Spark application that would quite often fail due to the CommitDeniedException.
Author: Jason Moore <jasonmoore2k@outlook.com>
Closes#12228 from jasonmoore2k/SPARK-14357.
## What changes were proposed in this pull request?
Here is why SPARK-14437 happens:
BlockManagerId is created using NettyBlockTransferService.hostName which comes from `customHostname`. And `Executor` will set `customHostname` to the hostname which is detected by the driver. However, the driver may not be able to detect the correct address in some complicated network (Netty's Channel.remoteAddress doesn't always return a connectable address). In such case, `BlockManagerId` will be created using a wrong hostname.
To fix this issue, this PR uses `hostname` provided by `SparkEnv.create` to create `NettyBlockTransferService` and set `NettyBlockTransferService.hostname` to this one directly. A bonus of this approach is NettyBlockTransferService won't bound to `0.0.0.0` which is much safer.
## How was this patch tested?
Manually checked the bound address using local-cluster.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#12240 from zsxwing/SPARK-14437.
Currently all `SparkFirehoseListener` implementations are broken since we expect listeners to extend `SparkListener`, while the fire hose only extends `SparkListenerInterface`. This changes the addListener function and the config based injection to use the interface instead.
The existing tests in SparkListenerSuite are improved such that they would have caught this.
Follow-up to #12142
Author: Michael Armbrust <michael@databricks.com>
Closes#12227 from marmbrus/fixListener.
## What changes were proposed in this pull request?
`OutputCommitCoordinator` was introduced to deal with concurrent task attempts racing to write output, leading to data loss or corruption. For more detail, read the [JIRA description](https://issues.apache.org/jira/browse/SPARK-14468).
Before: `OutputCommitCoordinator` is enabled only if speculation is enabled.
After: `OutputCommitCoordinator` is always enabled.
Users may still disable this through `spark.hadoop.outputCommitCoordination.enabled`, but they really shouldn't...
## How was this patch tested?
`OutputCommitCoordinator*Suite`
Author: Andrew Or <andrew@databricks.com>
Closes#12244 from andrewor14/always-occ.
The current package name uses a dash, which is a little weird but seemed
to work. That is, until a new test tried to mock a class that references
one of those shaded types, and then things started failing.
Most changes are just noise to fix the logging configs.
For reference, SPARK-8815 also raised this issue, although at the time it
did not cause any issues in Spark, so it was not addressed.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#11941 from vanzin/SPARK-14134.
## What changes were proposed in this pull request?
Send `RegisterExecutorResponse` using `executorRef` in order to make sure RegisterExecutorResponse and LaunchTask are both sent using the same channel. Then RegisterExecutorResponse will always arrive before LaunchTask
## How was this patch tested?
Existing unit tests
Closes#12078
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#12211 from zsxwing/SPARK-13112.
## What changes were proposed in this pull request?
As mentioned in the ticket this was because one get path in the refactored `BlockManager` did not check for remote storage.
## How was this patch tested?
Unit test, also verified manually with reproduction in the ticket.
cc JoshRosen
Author: Eric Liang <ekl@databricks.com>
Closes#12193 from ericl/spark-14252.
Because SQL keeps track of all known configs, some customization was
needed in SQLConf to allow that, since the core API does not have that
feature.
Tested via existing (and slightly updated) unit tests.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#11570 from vanzin/SPARK-529-sql.
This change modifies the "assembly/" module to just copy needed
dependencies to its build directory, and modifies the packaging
script to pick those up (and remove duplicate jars packages in the
examples module).
I also made some minor adjustments to dependencies to remove some
test jars from the final packaging, and remove jars that conflict with each
other when packaged separately (e.g. servlet api).
Also note that this change restores guava in applications' classpaths, even
though it's still shaded inside Spark. This is now needed for the Hadoop
libraries that are packaged with Spark, which now are not processed by
the shade plugin.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#11796 from vanzin/SPARK-13579.
## What changes were proposed in this pull request?
This PR contains the following 5 types of maintenance fix over 59 files (+94 lines, -93 lines).
- Fix typos(exception/log strings, testcase name, comments) in 44 lines.
- Fix lint-java errors (MaxLineLength) in 6 lines. (New codes after SPARK-14011)
- Use diamond operators in 40 lines. (New codes after SPARK-13702)
- Fix redundant semicolon in 5 lines.
- Rename class `InferSchemaSuite` to `CSVInferSchemaSuite` in CSVInferSchemaSuite.scala.
## How was this patch tested?
Manual and pass the Jenkins tests.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#12139 from dongjoon-hyun/SPARK-14355.
## What changes were proposed in this pull request?
This special cases 0 and 1 counts to avoid passing 0 degrees of freedom.
## How was this patch tested?
Tests run successfully. New test added.
## Note:
This recreates #11982 which was closed to due to non-updated diff. rxin srowen Commented there.
This also adds tests, reworks the code to perform the special casing (based on srowen's comments), and adds equality machinery for BoundedDouble, as well as changing how it is transformed to string.
Author: Marcin Tustin <mtustin@handybook.com>
Author: Marcin Tustin <mtustin@handy.com>
Closes#12016 from mtustin-handy/SPARK-14163.
## What changes were proposed in this pull request?
Straggler references to Tachyon were removed:
- for docs, `tachyon` has been generalized as `off-heap memory`;
- for Mesos test suits, the key-value `tachyon:true`/`tachyon:false` has been changed to `os:centos`/`os:ubuntu`, since `os` is an example constrain used by the [Mesos official docs](http://mesos.apache.org/documentation/attributes-resources/).
## How was this patch tested?
Existing test suites.
Author: Liwei Lin <lwlin7@gmail.com>
Closes#12129 from lw-lin/tachyon-cleanup.
## What changes were proposed in this pull request?
This PR aims to fix all Scala-Style multiline comments into Java-Style multiline comments in Scala codes.
(All comment-only changes over 77 files: +786 lines, −747 lines)
## How was this patch tested?
Manual.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#12130 from dongjoon-hyun/use_multiine_javadoc_comments.
## What changes were proposed in this pull request?
Adding long values for each Date in the ApplicationAttemptInfo API for easier use in code
## How was the this patch tested?
Tested with dev/run-tests
Author: Alex Bozarth <ajbozart@us.ibm.com>
Closes#11326 from ajbozarth/spark13241.
This patch adds support for caching blocks in the executor processes using direct / off-heap memory.
## User-facing changes
**Updated semantics of `OFF_HEAP` storage level**: In Spark 1.x, the `OFF_HEAP` storage level indicated that an RDD should be cached in Tachyon. Spark 2.x removed the external block store API that Tachyon caching was based on (see #10752 / SPARK-12667), so `OFF_HEAP` became an alias for `MEMORY_ONLY_SER`. As of this patch, `OFF_HEAP` means "serialized and cached in off-heap memory or on disk". Via the `StorageLevel` constructor, `useOffHeap` can be set if `serialized == true` and can be used to construct custom storage levels which support replication.
**Storage UI reporting**: the storage UI will now report whether in-memory blocks are stored on- or off-heap.
**Only supported by UnifiedMemoryManager**: for simplicity, this feature is only supported when the default UnifiedMemoryManager is used; applications which use the legacy memory manager (`spark.memory.useLegacyMode=true`) are not currently able to allocate off-heap storage memory, so using off-heap caching will fail with an error when legacy memory management is enabled. Given that we plan to eventually remove the legacy memory manager, this is not a significant restriction.
**Memory management policies:** the policies for dividing available memory between execution and storage are the same for both on- and off-heap memory. For off-heap memory, the total amount of memory available for use by Spark is controlled by `spark.memory.offHeap.size`, which is an absolute size. Off-heap storage memory obeys `spark.memory.storageFraction` in order to control the amount of unevictable storage memory. For example, if `spark.memory.offHeap.size` is 1 gigabyte and Spark uses the default `storageFraction` of 0.5, then up to 500 megabytes of off-heap cached blocks will be protected from eviction due to execution memory pressure. If necessary, we can split `spark.memory.storageFraction` into separate on- and off-heap configurations, but this doesn't seem necessary now and can be done later without any breaking changes.
**Use of off-heap memory does not imply use of off-heap execution (or vice-versa)**: for now, the settings controlling the use of off-heap execution memory (`spark.memory.offHeap.enabled`) and off-heap caching are completely independent, so Spark SQL can be configured to use off-heap memory for execution while continuing to cache blocks on-heap. If desired, we can change this in a followup patch so that `spark.memory.offHeap.enabled` affect the default storage level for cached SQL tables.
## Internal changes
- Rename `ByteArrayChunkOutputStream` to `ChunkedByteBufferOutputStream`
- It now returns a `ChunkedByteBuffer` instead of an array of byte arrays.
- Its constructor now accept an `allocator` function which is called to allocate `ByteBuffer`s. This allows us to control whether it allocates regular ByteBuffers or off-heap DirectByteBuffers.
- Because block serialization is now performed during the unroll process, a `ChunkedByteBufferOutputStream` which is configured with a `DirectByteBuffer` allocator will use off-heap memory for both unroll and storage memory.
- The `MemoryStore`'s MemoryEntries now tracks whether blocks are stored on- or off-heap.
- `evictBlocksToFreeSpace()` now accepts a `MemoryMode` parameter so that we don't try to evict off-heap blocks in response to on-heap memory pressure (or vice-versa).
- Make sure that off-heap buffers are properly de-allocated during MemoryStore eviction.
- The JVM limits the total size of allocated direct byte buffers using the `-XX:MaxDirectMemorySize` flag and the default tends to be fairly low (< 512 megabytes in some JVMs). To work around this limitation, this patch adds a custom DirectByteBuffer allocator which ignores this memory limit.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#11805 from JoshRosen/off-heap-caching.
## What changes were proposed in this pull request?
Currently in Spark on YARN, configurations can be passed through SparkConf, env and command arguments, some parts are duplicated, like client argument and SparkConf. So here propose to simplify the command arguments.
## How was this patch tested?
This patch is tested manually with unit test.
CC vanzin tgravescs , please help to suggest this proposal. The original purpose of this JIRA is to remove `ClientArguments`, through refactoring some arguments like `--class`, `--arg` are not so easy to replace, so here I remove the most part of command line arguments, only keep the minimal set.
Author: jerryshao <sshao@hortonworks.com>
Closes#11603 from jerryshao/SPARK-12343.
## What changes were proposed in this pull request?
This PR try to use `incUpdatedBlockStatuses ` to update the `updatedBlockStatuses ` when removing blocks, making sure `BlockManager` correctly updates `updatedBlockStatuses`
## How was this patch tested?
test("updated block statuses") in BlockManagerSuite.scala
Author: jeanlyn <jeanlyn92@gmail.com>
Closes#12091 from jeanlyn/updateBlock.
## What changes were proposed in this pull request?
We have a streaming job using `FlumePollInputStream` always driver OOM after few days, here is some driver heap dump before OOM
```
num #instances #bytes class name
----------------------------------------------
1: 13845916 553836640 org.apache.spark.storage.BlockStatus
2: 14020324 336487776 org.apache.spark.storage.StreamBlockId
3: 13883881 333213144 scala.collection.mutable.DefaultEntry
4: 8907 89043952 [Lscala.collection.mutable.HashEntry;
5: 62360 65107352 [B
6: 163368 24453904 [Ljava.lang.Object;
7: 293651 20342664 [C
...
```
`BlockStatus` and `StreamBlockId` keep on growing, and the driver OOM in the end.
After investigated, i found the `executorIdToStorageStatus` in `StorageStatusListener` seems never remove the blocks from `StorageStatus`.
In order to fix the issue, i try to use `onBlockUpdated` replace `onTaskEnd ` , so we can update the block informations(add blocks, drop the block from memory to disk and delete the blocks) in time.
## How was this patch tested?
Existing unit tests and manual tests
Author: jeanlyn <jeanlyn92@gmail.com>
Closes#11779 from jeanlyn/fix_driver_oom.
## What changes were proposed in this pull request?
Extract the workaround for HADOOP-10622 introduced by #11940 into UninterruptibleThread so that we can test and reuse it.
## How was this patch tested?
Unit tests
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#11971 from zsxwing/uninterrupt.
## What changes were proposed in this pull request?
Currently, for the key that can not fit within a long, we build a hash map for UnsafeHashedRelation, it's converted to BytesToBytesMap after serialization and deserialization. We should build a BytesToBytesMap directly to have better memory efficiency.
In order to do that, BytesToBytesMap should support multiple (K,V) pair with the same K, Location.putNewKey() is renamed to Location.append(), which could append multiple values for the same key (same Location). `Location.newValue()` is added to find the next value for the same key.
## How was this patch tested?
Existing tests. Added benchmark for broadcast hash join with duplicated keys.
Author: Davies Liu <davies@databricks.com>
Closes#11870 from davies/map2.
JIRA: https://issues.apache.org/jira/browse/SPARK-13742
## What changes were proposed in this pull request?
`RandomSampler.sample` currently accepts iterator as input and output another iterator. This makes it inappropriate to use in wholestage codegen of `Sampler` operator #11517. This change is to add non-iterator interface to `RandomSampler`.
This change adds a new method `def sample(): Int` to the trait `RandomSampler`. As we don't need to know the actual values of the sampling items, so this new method takes no arguments.
This method will decide whether to sample the next item or not. It returns how many times the next item will be sampled.
For `BernoulliSampler` and `BernoulliCellSampler`, the returned sampling times can only be 0 or 1. It simply means whether to sample the next item or not.
For `PoissonSampler`, the returned value can be more than 1, meaning the next item will be sampled multiple times.
## How was this patch tested?
Tests are added into `RandomSamplerSuite`.
Author: Liang-Chi Hsieh <simonh@tw.ibm.com>
Author: Liang-Chi Hsieh <viirya@appier.com>
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#11578 from viirya/random-sampler-no-iterator.
This patch extends Spark's `UnifiedMemoryManager` to add bookkeeping support for off-heap storage memory, an requirement for enabling off-heap caching (which will be done by #11805). The `MemoryManager`'s `storageMemoryPool` has been split into separate on- and off-heap pools and the storage and unroll memory allocation methods have been updated to accept a `memoryMode` parameter to specify whether allocations should be performed on- or off-heap.
In order to reduce the testing surface, the `StaticMemoryManager` does not support off-heap caching (we plan to eventually remove the `StaticMemoryManager`, so this isn't a significant limitation).
Author: Josh Rosen <joshrosen@databricks.com>
Closes#11942 from JoshRosen/off-heap-storage-memory-bookkeeping.
When a block is persisted in the MemoryStore at a serialized storage level, the current MemoryStore.putIterator() code will unroll the entire iterator as Java objects in memory, then will turn around and serialize an iterator obtained from the unrolled array. This is inefficient and doubles our peak memory requirements.
Instead, I think that we should incrementally serialize blocks while unrolling them.
A downside to incremental serialization is the fact that we will need to deserialize the partially-unrolled data in case there is not enough space to unroll the block and the block cannot be dropped to disk. However, I'm hoping that the memory efficiency improvements will outweigh any performance losses as a result of extra serialization in that hopefully-rare case.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#11791 from JoshRosen/serialize-incrementally.
## What changes were proposed in this pull request?
A fix for local metrics tests that can fail on fast machines.
This is probably what is suggested here #3380 by aarondav?
## How was this patch tested?
CI Tests
Cheers
Author: Joan <joan@goyeau.com>
Closes#11747 from joan38/SPARK-2208-Local-metrics-tests.
## What changes were proposed in this pull request?
In case of failure in subprocess launched in PipedRDD, the failure exception reads “Subprocess exited with status XXX”. Debugging this is not easy for users especially if there are multiple pipe() operations in the Spark application.
Changes done:
- Changed the exception message when non-zero exit code is seen
- If the reader and writer threads see exception, simply logging the command ran. The current model is to propagate the exception "as is" so that upstream Spark logic will take the right action based on what the exception was (eg. for fetch failure, it needs to retry; but for some fatal exception, it will decide to fail the stage / job). So wrapping the exception with a generic exception will not work. Altering the exception message will keep that guarantee but that is ugly (plus not all exceptions might have a constructor for a string message)
## How was this patch tested?
- Added a new test case
- Ran all existing tests for PipedRDD
Author: Tejas Patil <tejasp@fb.com>
Closes#11927 from tejasapatil/SPARK-14110-piperdd-failure.
This patch refactors the `MemoryStore` so that it can be tested without needing to construct / mock an entire `BlockManager`.
- The block manager's serialization- and compression-related methods have been moved from `BlockManager` to `SerializerManager`.
- `BlockInfoManager `is now passed directly to classes that need it, rather than being passed via the `BlockManager`.
- The `MemoryStore` now calls `dropFromMemory` via a new `BlockEvictionHandler` interface rather than directly calling the `BlockManager`. This change helps to enforce a narrow interface between the `MemoryStore` and `BlockManager` functionality and makes this interface easier to mock in tests.
- Several of the block unrolling tests have been moved from `BlockManagerSuite` into a new `MemoryStoreSuite`.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#11899 from JoshRosen/reduce-memorystore-blockmanager-coupling.
Building on the `SerializerManager` introduced in SPARK-13926/ #11755, this patch Spark modifies Spark's BlockManager to use RDD's ClassTags in order to select the best serializer to use when caching RDD blocks.
When storing a local block, the BlockManager `put()` methods use implicits to record ClassTags and stores those tags in the blocks' BlockInfo records. When reading a local block, the stored ClassTag is used to pick the appropriate serializer. When a block is stored with replication, the class tag is written into the block transfer metadata and will also be stored in the remote BlockManager.
There are two or three places where we don't properly pass ClassTags, including TorrentBroadcast and BlockRDD. I think this happens to work because the missing ClassTag always happens to be `ClassTag.Any`, but it might be worth looking more carefully at those places to see whether we should be more explicit.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#11801 from JoshRosen/pick-best-serializer-for-caching.
## What changes were proposed in this pull request?
[Spark Coding Style Guide](https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide) has 100-character limit on lines, but it's disabled for Java since 11/09/15. This PR enables **LineLength** checkstyle again. To help that, this also introduces **RedundantImport** and **RedundantModifier**, too. The following is the diff on `checkstyle.xml`.
```xml
- <!-- TODO: 11/09/15 disabled - the lengths are currently > 100 in many places -->
- <!--
<module name="LineLength">
<property name="max" value="100"/>
<property name="ignorePattern" value="^package.*|^import.*|a href|href|http://|https://|ftp://"/>
</module>
- -->
<module name="NoLineWrap"/>
<module name="EmptyBlock">
<property name="option" value="TEXT"/>
-167,5 +164,7
</module>
<module name="CommentsIndentation"/>
<module name="UnusedImports"/>
+ <module name="RedundantImport"/>
+ <module name="RedundantModifier"/>
```
## How was this patch tested?
Currently, `lint-java` is disabled in Jenkins. It needs a manual test.
After passing the Jenkins tests, `dev/lint-java` should passes locally.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#11831 from dongjoon-hyun/SPARK-14011.
This patch modifies the BlockManager, MemoryStore, and several other storage components so that serialized cached blocks are stored as multiple small chunks rather than as a single contiguous ByteBuffer.
This change will help to improve the efficiency of memory allocation and the accuracy of memory accounting when serializing blocks. Our current serialization code uses a ByteBufferOutputStream, which doubles and re-allocates its backing byte array; this increases the peak memory requirements during serialization (since we need to hold extra memory while expanding the array). In addition, we currently don't account for the extra wasted space at the end of the ByteBuffer's backing array, so a 129 megabyte serialized block may actually consume 256 megabytes of memory. After switching to storing blocks in multiple chunks, we'll be able to efficiently trim the backing buffers so that no space is wasted.
This change is also a prerequisite to being able to cache blocks which are larger than 2GB (although full support for that depends on several other changes which have not bee implemented yet).
Author: Josh Rosen <joshrosen@databricks.com>
Closes#11748 from JoshRosen/chunked-block-serialization.