this change rejects offers for slaves with unmet constraints for 120s to mitigate offer starvation.
this prevents mesos to send us these offers again and again.
in return, we get more offers for slaves which might meet our constraints.
and it enables mesos to send the rejected offers to other frameworks.
Author: Felix Bechstein <felix.bechstein@otto.de>
Closes#8639 from felixb/decline_offers_constraint_mismatch.
As shown in https://amplab.cs.berkeley.edu/jenkins/view/Spark-QA-Compile/job/Spark-Master-Scala211-Compile/1946/console , compilation fails with:
```
[error] /home/jenkins/workspace/Spark-Master-Scala211-Compile/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala:25: in class RDDInfo, multiple overloaded alternatives of constructor RDDInfo define default arguments.
[error] class RDDInfo(
[error]
```
This PR tries to fix the compilation error
Author: tedyu <yuzhihong@gmail.com>
Closes#9538 from tedyu/master.
A few changes:
1. Removed fold, since it can be confusing for distributed collections.
2. Created specific interfaces for each Dataset function (e.g. MapFunction, ReduceFunction, MapPartitionsFunction)
3. Added more documentation and test cases.
The other thing I'm considering doing is to have a "collector" interface for FlatMapFunction and MapPartitionsFunction, similar to MapReduce's map function.
Author: Reynold Xin <rxin@databricks.com>
Closes#9531 from rxin/SPARK-11564.
In order to lay the groundwork for proper off-heap memory support in SQL / Tungsten, we need to extend our MemoryManager to perform bookkeeping for off-heap memory.
## User-facing changes
This PR introduces a new configuration, `spark.memory.offHeapSize` (name subject to change), which specifies the absolute amount of off-heap memory that Spark and Spark SQL can use. If Tungsten is configured to use off-heap execution memory for allocating data pages, then all data page allocations must fit within this size limit.
## Internals changes
This PR contains a lot of internal refactoring of the MemoryManager. The key change at the heart of this patch is the introduction of a `MemoryPool` class (name subject to change) to manage the bookkeeping for a particular category of memory (storage, on-heap execution, and off-heap execution). These MemoryPools are not fixed-size; they can be dynamically grown and shrunk according to the MemoryManager's policies. In StaticMemoryManager, these pools have fixed sizes, proportional to the legacy `[storage|shuffle].memoryFraction`. In the new UnifiedMemoryManager, the sizes of these pools are dynamically adjusted according to its policies.
There are two subclasses of `MemoryPool`: `StorageMemoryPool` manages storage memory and `ExecutionMemoryPool` manages execution memory. The MemoryManager creates two execution pools, one for on-heap memory and one for off-heap. Instances of `ExecutionMemoryPool` manage the logic for fair sharing of their pooled memory across running tasks (in other words, the ShuffleMemoryManager-like logic has been moved out of MemoryManager and pushed into these ExecutionMemoryPool instances).
I think that this design is substantially easier to understand and reason about than the previous design, where most of these responsibilities were handled by MemoryManager and its subclasses. To see this, take at look at how simple the logic in `UnifiedMemoryManager` has become: it's now very easy to see when memory is dynamically shifted between storage and execution.
## TODOs
- [x] Fix handful of test failures in the MemoryManagerSuites.
- [x] Fix remaining TODO comments in code.
- [ ] Document new configuration.
- [x] Fix commented-out tests / asserts:
- [x] UnifiedMemoryManagerSuite.
- [x] Write tests that exercise the new off-heap memory management policies.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#9344 from JoshRosen/offheap-memory-accounting.
https://issues.apache.org/jira/browse/SPARK-10116
This is really trivial, just happened to notice it -- if `XORShiftRandom.hashSeed` is really supposed to have random bits throughout (as the comment implies), it needs to do something for the conversion to `long`.
mengxr mkolod
Author: Imran Rashid <irashid@cloudera.com>
Closes#8314 from squito/SPARK-10116.
This brings the support of off-heap memory for array inside BytesToBytesMap and InMemorySorter, then we could allocate all the memory from off-heap for execution.
Closes#8068
Author: Davies Liu <davies@databricks.com>
Closes#9477 from davies/unsafe_timsort.
Use the proxyBase set by the AM, if not found then use env. This is to fix the issue if somebody accidentally set APPLICATION_WEB_PROXY_BASE to wrong proxyBase
Author: Srinivasa Reddy Vundela <vsr@cloudera.com>
Closes#9448 from vundela/master.
spark.rpc is supposed to be configurable but is not currently (doesn't get propagated to executors because RpcEnv.create is done before driver properties are fetched).
Author: Nishkam Ravi <nishkamravi@gmail.com>
Closes#9460 from nishkamravi2/master_akka.
```PortableDataStream``` maintains some internal state. This makes it tricky to reuse a stream (one needs to call ```close``` on both the ```PortableDataStream``` and the ```InputStream``` it produces).
This PR removes all state from ```PortableDataStream``` and effectively turns it into an ```InputStream```/```Array[Byte]``` factory. This makes the user responsible for managing the ```InputStream``` it returns.
cc srowen
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#9417 from hvanhovell/SPARK-11449.
After aggregation, the dataset could be smaller than inputs, so it's better to do hash based aggregation for all inputs, then using sort based aggregation to merge them.
Author: Davies Liu <davies@databricks.com>
Closes#9383 from davies/fix_switch.
OutputCommitCoordinator uses a map in a place where an array would suffice, increasing its memory consumption for result stages with millions of tasks.
This patch replaces that map with an array. The only tricky part of this is reasoning about the range of possible array indexes in order to make sure that we never index out of bounds.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#9274 from JoshRosen/SPARK-11307.
Since we have 4 bytes as number of records in the beginning of a page, the address can not be zero, so we do not need the bitset.
For performance concerns, the bitset could help speed up false lookup if the slot is empty (because bitset is smaller than longArray, cache hit rate will be higher). In practice, the map is filled with 35% - 70% (use 50% as average), so only half of the false lookups can benefit of it, all others will pay the cost of load the bitset (still need to access the longArray anyway).
For aggregation, we always need to access the longArray (insert a new key after false lookup), also confirmed by a benchmark.
For broadcast hash join, there could be a regression, but a simple benchmark showed that it may not (most of lookup are false):
```
sqlContext.range(1<<20).write.parquet("small")
df = sqlContext.read.parquet('small')
for i in range(3):
t = time.time()
df2 = sqlContext.range(1<<26).selectExpr("id * 1111111111 % 987654321 as id2")
df2.join(df, df.id == df2.id2).count()
print time.time() -t
```
Having bitset (used time in seconds):
```
17.5404241085
10.2758829594
10.5786800385
```
After removing bitset (used time in seconds):
```
21.8939979076
12.4132959843
9.97224712372
```
cc rxin nongli
Author: Davies Liu <davies@databricks.com>
Closes#9452 from davies/remove_bitset.
This is an updated version of #8995 by a-roberts. Original description follows:
Snappy now supports concatenation of serialized streams, this patch contains a version number change and the "does not support" test is now a "supports" test.
Snappy 1.1.2 changelog mentions:
> snappy-java-1.1.2 (22 September 2015)
> This is a backward compatible release for 1.1.x.
> Add AIX (32-bit) support.
> There is no upgrade for the native libraries of the other platforms.
> A major change since 1.1.1 is a support for reading concatenated results of SnappyOutputStream(s)
> snappy-java-1.1.2-RC2 (18 May 2015)
> Fix#107: SnappyOutputStream.close() is not idempotent
> snappy-java-1.1.2-RC1 (13 May 2015)
> SnappyInputStream now supports reading concatenated compressed results of SnappyOutputStream
> There has been no compressed format change since 1.0.5.x. So You can read the compressed results > interchangeablly between these versions.
> Fixes a problem when java.io.tmpdir does not exist.
Closes#8995.
Author: Adam Roberts <aroberts@uk.ibm.com>
Author: Josh Rosen <joshrosen@databricks.com>
Closes#9439 from JoshRosen/update-snappy.
functions.scala was getting pretty long. I broke it into multiple files.
I also added explicit data types for some public vals, and renamed aggregate function pretty names to lower case, which is more consistent with rest of the functions.
Author: Reynold Xin <rxin@databricks.com>
Closes#9471 from rxin/SPARK-11505.
In YARN mode, when preemption is enabled, we may leave executors in a
zombie state while we wait to retrieve the reason for which the executor
exited. This is so that we don't account for failed tasks that were
running on a preempted executor.
The issue is that while we wait for this information, the scheduler
might decide to schedule tasks on the executor, which will never be
able to run them. Other side effects include the block manager still
considering the executor available to cache blocks, for example.
So, when we know that an executor went down but we don't know why,
stop everything related to the executor, except its running tasks.
Only when we know the reason for the exit (or give up waiting for
it) we do update the running tasks.
This is achieved by a new `disableExecutor()` method in the
`Schedulable` interface. For managers that do not behave like this
(i.e. every one but YARN), the existing `executorLost()` method
will behave the same way it did before.
On top of that change, a few minor changes that made debugging easier,
and fixed some other minor issues:
- The cluster-mode AM was printing a misleading log message every
time an executor disconnected from the driver (because the akka
actor system was shared between driver and AM).
- Avoid sending unnecessary requests for an executor's exit reason
when we already know it was explicitly disabled / killed. This
avoids both multiple requests, and unnecessary requests that would
just cause warning messages on the AM (in the explicit kill case).
- Tone down a log message about the executor being lost when it
exited normally (e.g. preemption)
- Wake up the AM monitor thread when requests for executor loss
reasons arrive too, so that we can more quickly remove executors
from this zombie state.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#8887 from vanzin/SPARK-10622.
The test functionality should be the same, but without using mockito; logs don't
really say anything useful but I suspect it may be the cause of the flakiness,
since updating mocks when multiple threads may be using it doesn't work very
well. It also allows some other cleanup (= less test code in FsHistoryProvider).
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#9425 from vanzin/SPARK-11466.
DriverDescription refactored to case class because it included no mutable fields.
ApplicationDescription had one mutable field, which was appUiUrl. This field was set by the driver to point to the driver web UI. Master was modifying this field when the application was removed to redirect requests to history server. This was wrong because objects which are sent over the wire should be immutable. Now appUiUrl is immutable in ApplicationDescription and always points to the driver UI even if it is already shutdown. The UI url which master exposes to the user and modifies dynamically is now included into ApplicationInfo - a data object which describes the application state internally in master. That URL in ApplicationInfo is initialised with the value from ApplicationDescription.
ApplicationDescription also included value user, which is now a part of case class fields.
Author: Jacek Lewandowski <lewandowski.jacek@gmail.com>
Closes#9299 from jacek-lewandowski/SPARK-11344.
"Client mode" means the RPC env will not listen for incoming connections.
This allows certain processes in the Spark stack (such as Executors or
tha YARN client-mode AM) to act as pure clients when using the netty-based
RPC backend, reducing the number of sockets needed by the app and also the
number of open ports.
Client connections are also preferred when endpoints that actually have
a listening socket are involved; so, for example, if a Worker connects
to a Master and the Master needs to send a message to a Worker endpoint,
that client connection will be used, even though the Worker is also
listening for incoming connections.
With this change, the workaround for SPARK-10987 isn't necessary anymore, and
is removed. The AM connects to the driver in "client mode", and that connection
is used for all driver <-> AM communication, and so the AM is properly notified
when the connection goes down.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#9210 from vanzin/SPARK-10997.
JIRA: https://issues.apache.org/jira/browse/SPARK-11271
As reported in the JIRA ticket, when there are too many tasks, the memory usage of MapStatus will cause problem. Use BitSet instead of RoaringBitMap should be more efficient in memory usage.
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#9243 from viirya/mapstatus-bitset.
Use standard JDK APIs for that (with a little help from Guava). Most of the
changes here are in test code, since there were no tests specific to that
part of the code.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#9257 from vanzin/SPARK-11073.
Large HDFS clusters may take a while to leave safe mode when starting; this change
makes the HS wait for that before doing checks about its configuraton. This means
the HS won't stop right away if HDFS is in safe mode and the configuration is not
correct, but that should be a very uncommon situation.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#9043 from vanzin/SPARK-11020.
[SPARK-11338: HistoryPage not multi-tenancy enabled ...](https://issues.apache.org/jira/browse/SPARK-11338)
- `HistoryPage.scala` ...prepending all page links with the web proxy (`uiRoot`) path
- `HistoryServerSuite.scala` ...adding a test case to verify all site-relative links are prefixed when the environment variable `APPLICATION_WEB_PROXY_BASE` (or System property `spark.ui.proxyBase`) is set
Author: Christian Kadner <ckadner@us.ibm.com>
Closes#9291 from ckadner/SPARK-11338 and squashes the following commits:
01d2f35 [Christian Kadner] [SPARK-11338][WebUI] nit fixes
d054bd7 [Christian Kadner] [SPARK-11338][WebUI] prependBaseUri in method makePageLink
8bcb3dc [Christian Kadner] [SPARK-11338][WebUI] Prepend application links on HistoryPage with uiRoot path
**TL;DR**: We can rule out one rare but potential cause of input stream corruption via defensive programming.
## Background
[MAPREDUCE-5918](https://issues.apache.org/jira/browse/MAPREDUCE-5918) is a bug where an instance of a decompressor ends up getting placed into a pool multiple times. Since the pool is backed by a list instead of a set, this can lead to the same decompressor being used in different places at the same time, which is not safe because those decompressors will overwrite each other's buffers. Sometimes this buffer sharing will lead to exceptions but other times it will might silently result in invalid / garbled input.
That Hadoop bug is fixed in Hadoop 2.7 but is still present in many Hadoop versions that we wish to support. As a result, I think that we should try to work around this issue in Spark via defensive programming to prevent RecordReaders from being closed multiple times.
So far, I've had a hard time coming up with explanations of exactly how double-`close()`s occur in practice, but I do have a couple of explanations that work on paper.
For instance, it looks like https://github.com/apache/spark/pull/7424, added in 1.5, introduces at least one extremely~rare corner-case path where Spark could double-close() a LineRecordReader instance in a way that triggers the bug. Here are the steps involved in the bad execution that I brainstormed up:
* [The task has finished reading input, so we call close()](https://github.com/apache/spark/blob/v1.5.1/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L168).
* [While handling the close call and trying to close the reader, reader.close() throws an exception]( https://github.com/apache/spark/blob/v1.5.1/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L190)
* We don't set `reader = null` after handling this exception, so the [TaskCompletionListener also ends up calling NewHadoopRDD.close()](https://github.com/apache/spark/blob/v1.5.1/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L156), which, in turn, closes the record reader again.
In this hypothetical situation, `LineRecordReader.close()` could [fail with an exception if its InputStream failed to close](https://github.com/apache/hadoop/blob/release-1.2.1/src/mapred/org/apache/hadoop/mapred/LineRecordReader.java#L212).
I googled for "Exception in RecordReader.close()" and it looks like it's possible for a closed Hadoop FileSystem to trigger an error there: [SPARK-757](https://issues.apache.org/jira/browse/SPARK-757), [SPARK-2491](https://issues.apache.org/jira/browse/SPARK-2491)
Looking at [SPARK-3052](https://issues.apache.org/jira/browse/SPARK-3052), it seems like it's possible to get spurious exceptions there when there is an error reading from Hadoop. If the Hadoop FileSystem were to get into an error state _right_ after reading the last record then it looks like we could hit the bug here in 1.5.
## The fix
This patch guards against these issues by modifying `HadoopRDD.close()` and `NewHadoopRDD.close()` so that they set `reader = null` even if an exception occurs in the `reader.close()` call. In addition, I modified `NextIterator. closeIfNeeded()` to guard against double-close if the first `close()` call throws an exception.
I don't have an easy way to test this, since I haven't been able to reproduce the bug that prompted this patch, but these changes seem safe and seem to rule out the on-paper reproductions that I was able to brainstorm up.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#9382 from JoshRosen/hadoop-decompressor-pooling-fix and squashes the following commits:
5ec97d7 [Josh Rosen] Add SqlNewHadoopRDD.unsetInputFileName() that I accidentally deleted.
ae46cf4 [Josh Rosen] Merge remote-tracking branch 'origin/master' into hadoop-decompressor-pooling-fix
087aa63 [Josh Rosen] Guard against double-close() of RecordReaders.
Since we do not need to preserve a page before calling compute(), MapPartitionsWithPreparationRDD is not needed anymore.
This PR basically revert #8543, #8511, #8038, #8011
Author: Davies Liu <davies@databricks.com>
Closes#9381 from davies/remove_prepare2.
See [SPARK-10986](https://issues.apache.org/jira/browse/SPARK-10986) for details.
This fixes the `ClassNotFoundException` for Spark classes in the serializer.
I am not sure this is the right way to handle the class loader, but I couldn't find any documentation on how the context class loader is used and who relies on it. It seems at least the serializer uses it to instantiate classes during deserialization.
I am open to suggestions (I tried this fix on a real Mesos cluster and it *does* fix the issue).
tnachen andrewor14
Author: Iulian Dragos <jaguarul@gmail.com>
Closes#9282 from dragos/issue/mesos-classloader.
This PR introduce a mechanism to call spill() on those SQL operators that support spilling (for example, BytesToBytesMap, UnsafeExternalSorter and ShuffleExternalSorter) if there is not enough memory for execution. The preserved first page is needed anymore, so removed.
Other Spillable objects in Spark core (ExternalSorter and AppendOnlyMap) are not included in this PR, but those could benefit from this (trigger others' spilling).
The PrepareRDD may be not needed anymore, could be removed in follow up PR.
The following script will fail with OOM before this PR, finished in 150 seconds with 2G heap (also works in 1.5 branch, with similar duration).
```python
sqlContext.setConf("spark.sql.shuffle.partitions", "1")
df = sqlContext.range(1<<25).selectExpr("id", "repeat(id, 2) as s")
df2 = df.select(df.id.alias('id2'), df.s.alias('s2'))
j = df.join(df2, df.id==df2.id2).groupBy(df.id).max("id", "id2")
j.explain()
print j.count()
```
For thread-safety, here what I'm got:
1) Without calling spill(), the operators should only be used by single thread, no safety problems.
2) spill() could be triggered in two cases, triggered by itself, or by other operators. we can check trigger == this in spill(), so it's still in the same thread, so safety problems.
3) if it's triggered by other operators (right now cache will not trigger spill()), we only spill the data into disk when it's in scanning stage (building is finished), so the in-memory sorter or memory pages are read-only, we only need to synchronize the iterator and change it.
4) During scanning, the iterator will only use one record in one page, we can't free this page, because the downstream is currently using it (used by UnsafeRow or other objects). In BytesToBytesMap, we just skip the current page, and dump all others into disk. In UnsafeExternalSorter, we keep the page that is used by current record (having the same baseObject), free it when loading the next record. In ShuffleExternalSorter, the spill() will not trigger during scanning.
5) In order to avoid deadlock, we didn't call acquireMemory during spill (so we reused the pointer array in InMemorySorter).
Author: Davies Liu <davies@databricks.com>
Closes#9241 from davies/force_spill.
Commit af3bc59d1f introduced new
functionality so that if an executor dies for a reason that's not
caused by one of the tasks running on the executor (e.g., due to
pre-emption), Spark doesn't count the failure towards the maximum
number of failures for the task. That commit introduced some vague
naming that this commit attempts to fix; in particular:
(1) The variable "isNormalExit", which was used to refer to cases where
the executor died for a reason unrelated to the tasks running on the
machine, has been renamed (and reversed) to "exitCausedByApp". The problem
with the existing name is that it's not clear (at least to me!) what it
means for an exit to be "normal"; the new name is intended to make the
purpose of this variable more clear.
(2) The variable "shouldEventuallyFailJob" has been renamed to
"countTowardsTaskFailures". This variable is used to determine whether
a task's failure should be counted towards the maximum number of failures
allowed for a task before the associated Stage is aborted. The problem
with the existing name is that it can be confused with implying that
the task's failure should immediately cause the stage to fail because it
is somehow fatal (this is the case for a fetch failure, for example: if
a task fails because of a fetch failure, there's no point in retrying,
and the whole stage should be failed).
Author: Kay Ousterhout <kayousterhout@gmail.com>
Closes#9164 from kayousterhout/SPARK-11178.
… ReceiverTracker and ReceiverSchedulingPolicy to use it
This PR includes the following changes:
1. Add a new preferred location format, `executor_<host>_<executorID>` (e.g., "executor_localhost_2"), to support specifying the executor locations for RDD.
2. Use the new preferred location format in `ReceiverTracker` to optimize the starting time of Receivers when there are multiple executors in a host.
The goal of this PR is to enable the streaming scheduler to place receivers (which run as tasks) in specific executors. Basically, I want to have more control on the placement of the receivers such that they are evenly distributed among the executors. We tried to do this without changing the core scheduling logic. But it does not allow specifying particular executor as preferred location, only at the host level. So if there are two executors in the same host, and I want two receivers to run on them (one on each executor), I cannot specify that. Current code only specifies the host as preference, which may end up launching both receivers on the same executor. We try to work around it but restarting a receiver when it does not launch in the desired executor and hope that next time it will be started in the right one. But that cause lots of restarts, and delays in correctly launching the receiver.
So this change, would allow the streaming scheduler to specify the exact executor as the preferred location. Also this is not exposed to the user, only the streaming scheduler uses this.
Author: zsxwing <zsxwing@gmail.com>
Closes#9181 from zsxwing/executor-location.
This commit fixes a bug where, in Standalone mode, if a task fails and crashes the JVM, the
failure is considered a "normal failure" (meaning it's considered unrelated to the task), so
the failure isn't counted against the task's maximum number of failures:
af3bc59d1f (diff-a755f3d892ff2506a7aa7db52022d77cL138).
As a result, if a task fails in a way that results in it crashing the JVM, it will continuously be
re-launched, resulting in a hang. This commit fixes that problem.
This bug was introduced by #8007; andrewor14 mccheah vanzin can you take a look at this?
This error is hard to trigger because we handle executor losses through 2 code paths (the second is via Akka, where Akka notices that the executor endpoint is disconnected). In my setup, the Akka code path completes first, and doesn't have this bug, so things work fine (see my recent email to the dev list about this). If I manually disable the Akka code path, I can see the hang (and this commit fixes the issue).
Author: Kay Ousterhout <kayousterhout@gmail.com>
Closes#9273 from kayousterhout/SPARK-11306.
The SizeEstimator keeps a cache of ClassInfos but this cache uses Class objects as keys.
Which results in strong references to the Class objects. If these classes are dynamically created
this prevents the corresponding ClassLoader from being GCed. Leading to PermGen exhaustion.
We use a Map with WeakKeys to prevent this issue.
Author: Sem Mulder <sem.mulder@site2mobile.com>
Closes#9244 from SemMulder/fix-sizeestimator-classunloading.
This patch refactors the MemoryManager class structure. After #9000, Spark had the following classes:
- MemoryManager
- StaticMemoryManager
- ExecutorMemoryManager
- TaskMemoryManager
- ShuffleMemoryManager
This is fairly confusing. To simplify things, this patch consolidates several of these classes:
- ShuffleMemoryManager and ExecutorMemoryManager were merged into MemoryManager.
- TaskMemoryManager is moved into Spark Core.
**Key changes and tasks**:
- [x] Merge ExecutorMemoryManager into MemoryManager.
- [x] Move pooling logic into Allocator.
- [x] Move TaskMemoryManager from `spark-unsafe` to `spark-core`.
- [x] Refactor the existing Tungsten TaskMemoryManager interactions so Tungsten code use only this and not both this and ShuffleMemoryManager.
- [x] Refactor non-Tungsten code to use the TaskMemoryManager instead of ShuffleMemoryManager.
- [x] Merge ShuffleMemoryManager into MemoryManager.
- [x] Move code
- [x] ~~Simplify 1/n calculation.~~ **Will defer to followup, since this needs more work.**
- [x] Port ShuffleMemoryManagerSuite tests.
- [x] Move classes from `unsafe` package to `memory` package.
- [ ] Figure out how to handle the hacky use of the memory managers in HashedRelation's broadcast variable construction.
- [x] Test porting and cleanup: several tests relied on mock functionality (such as `TestShuffleMemoryManager.markAsOutOfMemory`) which has been changed or broken during the memory manager consolidation
- [x] AbstractBytesToBytesMapSuite
- [x] UnsafeExternalSorterSuite
- [x] UnsafeFixedWidthAggregationMapSuite
- [x] UnsafeKVExternalSorterSuite
**Compatiblity notes**:
- This patch introduces breaking changes in `ExternalAppendOnlyMap`, which is marked as `DevloperAPI` (likely for legacy reasons): this class now cannot be used outside of a task.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#9127 from JoshRosen/SPARK-10984.
Executing deploy.client.TestClient fails due to bad class name for TestExecutor in ApplicationDescription.
Author: Bryan Cutler <bjcutler@us.ibm.com>
Closes#9255 from BryanCutler/fix-TestClient-classname-SPARK-11287.
Two typos squashed.
BTW Let me know how to proceed with other typos if I ran across any. I don't feel well to leave them aside as much as sending pull requests with such tiny changes. Guide me.
Author: Jacek Laskowski <jacek.laskowski@deepsense.io>
Closes#9250 from jaceklaskowski/typos-hunting.
…ut building with -Phive-thriftserver and SPARK_PREPEND_CLASSES is set
This is the exception after this patch. Please help review.
```
java.lang.NoClassDefFoundError: org/apache/hadoop/hive/cli/CliDriver
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:412)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:270)
at org.apache.spark.util.Utils$.classForName(Utils.scala:173)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:647)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hive.cli.CliDriver
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 21 more
Failed to load hive class.
You need to build Spark with -Phive and -Phive-thriftserver.
```
Author: Jeff Zhang <zjffdu@apache.org>
Closes#9134 from zjffdu/SPARK-11125.
This test can take a little while to finish on slow / loaded machines.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#9235 from vanzin/SPARK-11134.
The current NettyRpc has a message order issue because it uses a thread pool to send messages. E.g., running the following two lines in the same thread,
```
ref.send("A")
ref.send("B")
```
The remote endpoint may see "B" before "A" because sending "A" and "B" are in parallel.
To resolve this issue, this PR added an outbox for each connection, and if we are connecting to the remote node when sending messages, just cache the sending messages in the outbox and send them one by one when the connection is established.
Author: zsxwing <zsxwing@gmail.com>
Closes#9197 from zsxwing/rpc-outbox.
```
// My machine only has 8 cores
$ bin/spark-shell --master local[32]
scala> val df = sc.parallelize(Seq((1, 1), (2, 2))).toDF("a", "b")
scala> df.as("x").join(df.as("y"), $"x.a" === $"y.a").count()
Caused by: java.io.IOException: Unable to acquire 2097152 bytes of memory
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:351)
```
Author: Andrew Or <andrew@databricks.com>
Closes#9209 from andrewor14/fix-local-page-size.
This commit removes unnecessary calls to addPendingTask in
TaskSetManager.executorLost. These calls are unnecessary: for
tasks that are still pending and haven't been launched, they're
still in all of the correct pending lists, so calling addPendingTask
has no effect. For tasks that are currently running (which may still be
in the pending lists, depending on how they were scheduled), we call
addPendingTask in handleFailedTask, so the calls at the beginning
of executorLost are redundant.
I think these calls are left over from when we re-computed the locality
levels in addPendingTask; now that we call recomputeLocality separately,
I don't think these are necessary.
Now that those calls are removed, the readding parameter in addPendingTask
is no longer necessary, so this commit also removes that parameter.
markhamstra can you take a look at this?
cc vanzin
Author: Kay Ousterhout <kayousterhout@gmail.com>
Closes#9154 from kayousterhout/SPARK-11163.
The current `NettyRpcEndpointRef.send` can be interrupted because it uses `LinkedBlockingQueue.put`, which may hang the application.
Image the following execution order:
| thread 1: TaskRunner.kill | thread 2: TaskRunner.run
------------- | ------------- | -------------
1 | killed = true |
2 | | if (killed) {
3 | | throw new TaskKilledException
4 | | case _: TaskKilledException _: InterruptedException if task.killed =>
5 | task.kill(interruptThread): interruptThread is true |
6 | | execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))
7 | | localEndpoint.send(StatusUpdate(taskId, state, serializedData)): in LocalBackend
Then `localEndpoint.send(StatusUpdate(taskId, state, serializedData))` will throw `InterruptedException`. This will prevent the executor from updating the task status and hang the application.
An failure caused by the above issue here: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44062/consoleFull
Since `receivers` is an unbounded `LinkedBlockingQueue`, we can just use `LinkedBlockingQueue.offer` to resolve this issue.
Author: zsxwing <zsxwing@gmail.com>
Closes#9198 from zsxwing/dont-interrupt-send.
There's a lot of duplication between SortShuffleManager and UnsafeShuffleManager. Given that these now provide the same set of functionality, now that UnsafeShuffleManager supports large records, I think that we should replace SortShuffleManager's serialized shuffle implementation with UnsafeShuffleManager's and should merge the two managers together.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#8829 from JoshRosen/consolidate-sort-shuffle-implementations.
Correct the logic to return `HDFSCacheTaskLocation` instance when the input `str` is a in memory location.
Author: zhichao.li <zhichao.li@intel.com>
Closes#9096 from zhichao-li/uselessBranch.
I was looking at this code and found the documentation to be insufficient. I added more documentation, and refactored some relevant code path slightly to improve encapsulation. There are more that I want to do, but I want to get these changes in before doing more work.
My goal is to reduce exposing internal fields directly in ShuffleMapStage to improve encapsulation. After this change, DAGScheduler no longer directly writes outputLocs. There are still 3 places that reads outputLocs directly, but we can change those later.
Author: Reynold Xin <rxin@databricks.com>
Closes#9175 from rxin/stage-cleanup.
`transient` annotations on class parameters (not case class parameters or vals) causes compilation errors during compilation with Scala 2.11.
I understand that transient *parameters* make no sense, however I don't quite understand why the 2.10 compiler accepted them.
Note: in case it is preferred to keep the annotations in case someone would in the future want to redefine them as vals, it would also be possible to just add `val` after the annotation, e.g. `class Foo(transient x: Int)` becomes `class Foo(transient private val x: Int)`.
I chose to remove the annotation as it also reduces needles clutter, however please feel free to tell me if you prefer the second option and I'll update the PR
Author: Jakob Odersky <jodersky@gmail.com>
Closes#9126 from jodersky/sbt-scala-2.11.
I also added some information to container-failure error msgs about what host they failed on, which would have helped me identify the problem that lead me to this JIRA and PR sooner.
Author: Ryan Williams <ryan.blake.williams@gmail.com>
Closes#9147 from ryan-williams/dyn-exec-failures.
This is my own original work and I license this to the project under the project's open source license
Author: Chris Bannister <chris.bannister@swiftkey.com>
Author: Chris Bannister <chris.bannister@swiftkey.net>
Closes#8358 from Zariel/mesos-local-dir.
JIRA: https://issues.apache.org/jira/browse/SPARK-11051
When a `RDD` is materialized and checkpointed, its partitions and dependencies are cleared. If we allow local checkpointing on it and assign `LocalRDDCheckpointData` to its `checkpointData`. Next time when the RDD is materialized again, the error will be thrown.
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#9072 from viirya/no-localcheckpoint-after-checkpoint.
Because the registration RPC was not really an RPC, but a bunch of
disconnected messages, it was possible for other messages to be
sent before the reply to the registration arrived, and that would
confuse the Worker. Especially in local-cluster mode, the worker was
succeptible to receiving an executor request before it received a
message from the master saying registration succeeded.
On top of the above, the change also fixes a ClassCastException when
the registration fails, which also affects the executor registration
protocol. Because the `ask` is issued with a specific return type,
if the error message (of a different type) was returned instead, the
code would just die with an exception. This is fixed by having a common
base trait for these reply messages.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#9138 from vanzin/SPARK-11131.
Mesos has a feature for linking to frameworks running on top of Mesos
from the Mesos WebUI. This commit enables Spark to make use of this
feature so one can directly visit the running Spark WebUIs from the
Mesos WebUI.
Author: ph <ph@plista.com>
Closes#9135 from philipphoffmann/SPARK-11129.
Its classdoc actually says; "NOTE: DO NOT USE this class outside of Spark. It is intended as an internal utility."
Author: Reynold Xin <rxin@databricks.com>
Closes#9155 from rxin/private-logging-trait.
Switched from deprecated org.apache.hadoop.fs.permission.AccessControlException to org.apache.hadoop.security.AccessControlException.
Author: gweidner <gweidner@us.ibm.com>
Closes#9144 from gweidner/SPARK-11109.
Some json parsers are not closed. parser in JacksonParser#parseJson, for example.
Author: navis.ryu <navis@apache.org>
Closes#9130 from navis/SPARK-11124.
#9084 uncovered that many tests that test spilling don't actually spill. This is a follow-up patch to fix that to ensure our unit tests actually catch potential bugs in spilling. The size of this patch is inflated by the refactoring of `ExternalSorterSuite`, which had a lot of duplicate code and logic.
Author: Andrew Or <andrew@databricks.com>
Closes#9124 from andrewor14/spilling-tests.
If the heartbeat receiver kills executors (and new ones are not registered to replace them), the idle timeout for the old executors will be lost (and then change a total number of executors requested by Driver), So new ones will be not to asked to replace them.
For example, executorsPendingToRemove=Set(1), and executor 2 is idle timeout before a new executor is asked to replace executor 1. Then driver kill executor 2, and sending RequestExecutors to AM. But executorsPendingToRemove=Set(1,2), So AM doesn't allocate a executor to replace 1.
see: https://github.com/apache/spark/pull/8668
Author: KaiXinXiaoLei <huleilei1@huawei.com>
Author: huleilei <huleilei1@huawei.com>
Closes#8945 from KaiXinXiaoLei/pendingexecutor.
Internal accumulators don't write the internal flag to event log. So on the history server Web UI, all accumulators are not internal. This causes incorrect peak execution memory and unwanted accumulator table displayed on the stage page.
To fix it, I add the "internal" property of AccumulableInfo when writing the event log.
Author: Carson Wang <carson.wang@intel.com>
Closes#9061 from carsonwang/accumulableBug.
Restrict tasks (of job) to only 1 to ensure that the causing Exception asserted for job failure is the deliberately thrown DAGSchedulerSuiteDummyException intended, not an UnsupportedOperationException from any second/subsequent tasks that can propagate from a race condition during code execution.
Author: shellberg <sah@zepler.org>
Closes#9076 from shellberg/shellberg-DAGSchedulerSuite-misbehavedResultHandlerTest-patch-1.
A few more changes:
1. Renamed IDVerifier -> RpcEndpointVerifier
2. Renamed NettyRpcAddress -> RpcEndpointAddress
3. Simplified NettyRpcHandler a bit by removing the connection count tracking. This is OK because I now force spark.shuffle.io.numConnectionsPerPeer to 1
4. Reduced spark.rpc.connect.threads to 64. It would be great to eventually remove this extra thread pool.
5. Minor cleanup & documentation.
Author: Reynold Xin <rxin@databricks.com>
Closes#9112 from rxin/SPARK-11096.
should pick into spark 1.5.2 also.
https://issues.apache.org/jira/browse/SPARK-10619
looks like this was broken by commit: fb1d06fc24 (diff-b8adb646ef90f616c34eb5c98d1ebd16)
It looks like somethings were change to use the UIUtils.listingTable but executor page wasn't converted so when it removed sortable from the UIUtils. TABLE_CLASS_NOT_STRIPED it broke this page.
Simply add the sortable tag back in and it fixes both active UI and the history server UI.
Author: Tom Graves <tgraves@yahoo-inc.com>
Closes#9101 from tgravescs/SPARK-10619.
This patch unifies the memory management of the storage and execution regions such that either side can borrow memory from each other. When memory pressure arises, storage will be evicted in favor of execution. To avoid regressions in cases where storage is crucial, we dynamically allocate a fraction of space for storage that execution cannot evict. Several configurations are introduced:
- **spark.memory.fraction (default 0.75)**: fraction of the heap space used for execution and storage. The lower this is, the more frequently spills and cached data eviction occur. The purpose of this config is to set aside memory for internal metadata, user data structures, and imprecise size estimation in the case of sparse, unusually large records.
- **spark.memory.storageFraction (default 0.5)**: size of the storage region within the space set aside by `spark.memory.fraction`. Cached data may only be evicted if total storage exceeds this region.
- **spark.memory.useLegacyMode (default false)**: whether to use the memory management that existed in Spark 1.5 and before. This is mainly for backward compatibility.
For a detailed description of the design, see [SPARK-10000](https://issues.apache.org/jira/browse/SPARK-10000). This patch builds on top of the `MemoryManager` interface introduced in #9000.
Author: Andrew Or <andrew@databricks.com>
Closes#9084 from andrewor14/unified-memory-manager.
Two points in this PR:
1. Originally thought was that a named R list is assumed to be a struct in SerDe. But this is problematic because some R functions will implicitly generate named lists that are not intended to be a struct when transferred by SerDe. So SerDe clients have to explicitly mark a names list as struct by changing its class from "list" to "struct".
2. SerDe is in the Spark Core module, and data of StructType is represented as GenricRow which is defined in Spark SQL module. SerDe can't import GenricRow as in maven build Spark SQL module depends on Spark Core module. So this PR adds a registration hook in SerDe to allow SQLUtils in Spark SQL module to register its functions for serialization and deserialization of StructType.
Author: Sun Rui <rui.sun@intel.com>
Closes#8794 from sun-rui/SPARK-10051.
I'm going through the implementation right now for post-doc review. Adding more comments and renaming things as I go through them.
I also want to write higher level documentation about how the whole thing works -- but those will come in other pull requests.
Author: Reynold Xin <rxin@databricks.com>
Closes#9091 from rxin/rpc-review.
https://issues.apache.org/jira/browse/SPARK-10858
The issue here is that in resolveURI we default to calling new File(path).getAbsoluteFile().toURI(). But if the path passed in already has a # in it then File(path) will think that is supposed to be part of the actual file path and not a fragment so it changes # to %23. Then when we try to parse that later in Client as a URI it doesn't recognize there is a fragment.
so to fix we just check if there is a fragment, still create the File like we did before and then add the fragment back on.
Author: Tom Graves <tgraves@yahoo-inc.com>
Closes#9035 from tgravescs/SPARK-10858.
This change adds an API that encapsulates information about an app
launched using the library. It also creates a socket-based communication
layer for apps that are launched as child processes; the launching
application listens for connections from launched apps, and once
communication is established, the channel can be used to send updates
to the launching app, or to send commands to the child app.
The change also includes hooks for local, standalone/client and yarn
masters.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#7052 from vanzin/SPARK-8673.
This patch introduces a `MemoryManager` that is the central arbiter of how much memory to grant to storage and execution. This patch is primarily concerned only with refactoring while preserving the existing behavior as much as possible.
This is the first step away from the existing rigid separation of storage and execution memory, which has several major drawbacks discussed on the [issue](https://issues.apache.org/jira/browse/SPARK-10956). It is the precursor of a series of patches that will attempt to address those drawbacks.
Author: Andrew Or <andrew@databricks.com>
Author: Josh Rosen <joshrosen@databricks.com>
Author: andrewor14 <andrew@databricks.com>
Closes#9000 from andrewor14/memory-manager.
In YARN client mode, when the AM connects to the driver, it may be the case
that the driver never needs to send a message back to the AM (i.e., no
dynamic allocation or preemption). This triggers an issue in the netty rpc
backend where no disconnection event is sent to endpoints, and the AM never
exits after the driver shuts down.
The real fix is too complicated, so this is a quick hack to unblock YARN
client mode until we can work on the real fix. It forces the driver to
send a message to the AM when the AM registers, thus establishing that
connection and enabling the disconnection event when the driver goes
away.
Also, a minor side issue: when the executor is shutting down, it needs
to send an "ack" back to the driver when using the netty rpc backend; but
that "ack" wasn't being sent because the handler was shutting down the rpc
env before returning. So added a change to delay the shutdown a little bit,
allowing the ack to be sent back.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#9021 from vanzin/SPARK-10987.
The `self` method returns null when called from the constructor;
instead, registration should happen in the `onStart` method, at
which point the `self` reference has already been initialized.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#9005 from vanzin/SPARK-10964.
This makes YARN containers behave like all other processes launched by
Spark, which launch with a default perm gen size of 256m unless
overridden by the user (or not needed by the vm).
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#8970 from vanzin/SPARK-10916.
This PR just reverted 02144d6745 to remerge #6457 and also included the commits in #8905.
Author: zsxwing <zsxwing@gmail.com>
Closes#8944 from zsxwing/SPARK-6028.
Compatibility between history server script and functionality
The history server has its argument parsing class in HistoryServerArguments. However, this doesn't get involved in the start-history-server.sh codepath where the $0 arg is assigned to spark.history.fs.logDirectory and all other arguments discarded (e.g --property-file.)
This stops the other options being usable from this script
Author: Joshi <rekhajoshm@gmail.com>
Author: Rekha Joshi <rekhajoshm@gmail.com>
Closes#8758 from rekhajoshm/SPARK-10317.
The utilities such as Substring#substringBinarySQL and BinaryPrefixComparator#computePrefix for binary data are put together in ByteArray for easy-to-read.
Author: Takeshi YAMAMURO <linguin.m.s@gmail.com>
Closes#8122 from maropu/CleanUpForBinaryType.
The YARN backend doesn't like when user code calls System.exit, since it cannot know the exit status and thus cannot set an appropriate final status for the application.
This PR remove the usage of system.exit to exit the RRunner. Instead, when the R process running an SparkR script returns an exit code other than 0, throws SparkUserAppException which will be caught by ApplicationMaster and ApplicationMaster knows it failed. For other failures, throws SparkException.
Author: Sun Rui <rui.sun@intel.com>
Closes#8938 from sun-rui/SPARK-10851.
Fix the following issues in StandaloneDynamicAllocationSuite:
1. It should not assume master and workers start in order
2. It should not assume master and workers get ready at once
3. It should not assume the application is already registered with master after creating SparkContext
4. It should not access Master.app and idToApp which are not thread safe
The changes includes:
* Use `eventually` to wait until master and workers are ready to fix 1 and 2
* Use `eventually` to wait until the application is registered with master to fix 3
* Use `askWithRetry[MasterStateResponse](RequestMasterState)` to get the application info to fix 4
Author: zsxwing <zsxwing@gmail.com>
Closes#8914 from zsxwing/fix-StandaloneDynamicAllocationSuite.
In the course of https://issues.apache.org/jira/browse/LEGAL-226 it came to light that the guidance at http://www.apache.org/dev/licensing-howto.html#permissive-deps means that permissively-licensed dependencies has a different interpretation than we (er, I) had been operating under. "pointer ... to the license within the source tree" specifically means a copy of the license within Spark's distribution, whereas at the moment, Spark's LICENSE has a pointer to the project's license in the other project's source tree.
The remedy is simply to inline all such license references (i.e. BSD/MIT licenses) or include their text in "licenses" subdirectory and point to that.
Along the way, we can also treat other BSD/MIT licenses, whose text has been inlined into LICENSE, in the same way.
The LICENSE file can continue to provide a helpful list of BSD/MIT licensed projects and a pointer to their sites. This would be over and above including license text in the distro, which is the essential thing.
Author: Sean Owen <sowen@cloudera.com>
Closes#8919 from srowen/SPARK-10833.
While this is likely not a huge issue for real production systems, for test systems which may setup a Spark Context and tear it down and stand up a Spark Context with a different master (e.g. some local mode & some yarn mode) tests this cane be an issue. Discovered during work on spark-testing-base on Spark 1.4.1, but seems like the logic that triggers it is present in master (see SparkHadoopUtil object). A valid work around for users encountering this issue is to fork a different JVM, however this can be heavy weight.
```
[info] SampleMiniClusterTest:
[info] Exception encountered when attempting to run a suite with class name: com.holdenkarau.spark.testing.SampleMiniClusterTest *** ABORTED ***
[info] java.lang.ClassCastException: org.apache.spark.deploy.SparkHadoopUtil cannot be cast to org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
[info] at org.apache.spark.deploy.yarn.YarnSparkHadoopUtil$.get(YarnSparkHadoopUtil.scala:163)
[info] at org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:257)
[info] at org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:561)
[info] at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:115)
[info] at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57)
[info] at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141)
[info] at org.apache.spark.SparkContext.<init>(SparkContext.scala:497)
[info] at com.holdenkarau.spark.testing.SharedMiniCluster$class.setup(SharedMiniCluster.scala:186)
[info] at com.holdenkarau.spark.testing.SampleMiniClusterTest.setup(SampleMiniClusterTest.scala:26)
[info] at com.holdenkarau.spark.testing.SharedMiniCluster$class.beforeAll(SharedMiniCluster.scala:103)
```
Author: Holden Karau <holden@pigscanfly.ca>
Closes#8911 from holdenk/SPARK-10812-spark-hadoop-util-support-switching-to-yarn.
This makes two changes:
- Allow reduce tasks to fetch multiple map output partitions -- this is a pretty small change to HashShuffleFetcher
- Move shuffle locality computation out of DAGScheduler and into ShuffledRDD / MapOutputTracker; this was needed because the code in DAGScheduler wouldn't work for RDDs that fetch multiple map output partitions from each reduce task
I also added an AdaptiveSchedulingSuite that creates RDDs depending on multiple map output partitions.
Author: Matei Zaharia <matei@databricks.com>
Closes#8844 from mateiz/spark-9852.
The DiskBlockObjectWriter constructor took a BlockId parameter but never used it. As part of some general cleanup in these interfaces, this patch refactors its constructor to eliminate this parameter.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#8871 from JoshRosen/disk-block-object-writer-blockid-cleanup.
This patch reverts most of the changes in a previous fix#8827.
The real cause of the issue is that in `TungstenAggregate`'s prepare method we only reserve 1 page, but later when we switch to sort-based aggregation we try to acquire 1 page AND a pointer array. The longer-term fix should be to reserve also the pointer array, but for now ***we will simply not track the pointer array***. (Note that elsewhere we already don't track the pointer array, e.g. [here](a18208047f/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java (L88)))
Note: This patch reuses the unit test added in #8827 so it doesn't show up in the diff.
Author: Andrew Or <andrew@databricks.com>
Closes#8888 from andrewor14/dont-track-pointer-array.
Python DataFrame.head/take now requires scanning all the partitions. This pull request changes them to delegate the actual implementation to Scala DataFrame (by calling DataFrame.take).
This is more of a hack for fixing this issue in 1.5.1. A more proper fix is to change executeCollect and executeTake to return InternalRow rather than Row, and thus eliminate the extra round-trip conversion.
Author: Reynold Xin <rxin@databricks.com>
Closes#8876 from rxin/SPARK-10731.
This patch refactors Python UDF handling:
1. Extract the per-partition Python UDF calling logic from PythonRDD into a PythonRunner. PythonRunner itself expects iterator as input/output, and thus has no dependency on RDD. This way, we can use PythonRunner directly in a mapPartitions call, or in the future in an environment without RDDs.
2. Use PythonRunner in Spark SQL's BatchPythonEvaluation.
3. Updated BatchPythonEvaluation to only use its input once, rather than twice. This should fix Python UDF performance regression in Spark 1.5.
There are a number of small cleanups I wanted to do when I looked at the code, but I kept most of those out so the diff looks small.
This basically implements the approach in https://github.com/apache/spark/pull/8833, but with some code moving around so the correctness doesn't depend on the inner workings of Spark serialization and task execution.
Author: Reynold Xin <rxin@databricks.com>
Closes#8835 from rxin/python-iter-refactor.
The current shuffle code has an interface named ShuffleReader with only one implementation, HashShuffleReader. This naming is confusing, since the same read path code is used for both sort- and hash-based shuffle. This patch addresses this by renaming HashShuffleReader to BlockStoreShuffleReader.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#8825 from JoshRosen/shuffle-reader-cleanup.
If we cache the InputFormat, all tasks on the same executor will share it.
Some InputFormat is thread safety, but some are not, such as HiveHBaseTableInputFormat. If tasks share a non thread safe InputFormat, unexpected error may be occurs.
To avoid it, I think we should delete the input format caching.
Author: xutingjun <xutingjun@huawei.com>
Author: meiyoula <1039320815@qq.com>
Author: Xutingjun <xutingjun@huawei.com>
Closes#7918 from XuTingjun/cached_inputFormat.
In ```RUtils.sparkRPackagePath()``` we
1. Call ``` sys.props("spark.submit.deployMode")``` which returns null if ```spark.submit.deployMode``` is not suet
2. Call ``` sparkConf.get("spark.submit.deployMode")``` which throws ```NoSuchElementException``` if ```spark.submit.deployMode``` is not set. This patch simply passes a default value ("cluster") for ```spark.submit.deployMode```.
cc rxin
Author: Hossein <hossein@databricks.com>
Closes#8832 from falaki/SPARK-10711.
The job group, and job descriptions information is passed through thread local properties, and get inherited by child threads. In case of spark streaming, the streaming jobs inherit these properties from the thread that called streamingContext.start(). This may not make sense.
1. Job group: This is mainly used for cancelling a group of jobs together. It does not make sense to cancel streaming jobs like this, as the effect will be unpredictable. And its not a valid usecase any way, to cancel a streaming context, call streamingContext.stop()
2. Job description: This is used to pass on nice text descriptions for jobs to show up in the UI. The job description of the thread that calls streamingContext.start() is not useful for all the streaming jobs, as it does not make sense for all of the streaming jobs to have the same description, and the description may or may not be related to streaming.
The solution in this PR is meant for the Spark master branch, where local properties are inherited by cloning the properties. The job group and job description in the thread that starts the streaming scheduler are explicitly removed, so that all the subsequent child threads does not inherit them. Also, the starting is done in a new child thread, so that setting the job group and description for streaming, does not change those properties in the thread that called streamingContext.start().
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#8781 from tdas/SPARK-10649.
Track pending tasks by partition ID instead of Task objects.
Before this change, failure & retry could result in a case where a stage got submitted before the map output from its dependencies get registered. This was due to an error in the condition for registering map outputs.
Author: hushan[胡珊] <hushan@xiaomi.com>
Author: Imran Rashid <irashid@cloudera.com>
Closes#7699 from squito/SPARK-5259.
I noticed only one block manager registered with master in an unsuccessful build (https://amplab.cs.berkeley.edu/jenkins/job/Spark-Master-SBT/AMPLAB_JENKINS_BUILD_PROFILE=hadoop2.2,label=spark-test/3534/)
```
15/09/16 13:02:30.981 pool-1-thread-1-ScalaTest-running-BroadcastSuite INFO SparkContext: Running Spark version 1.6.0-SNAPSHOT
...
15/09/16 13:02:38.133 sparkDriver-akka.actor.default-dispatcher-19 INFO BlockManagerMasterEndpoint: Registering block manager localhost:48196 with 530.3 MB RAM, BlockManagerId(0, localhost, 48196)
```
In addition, the first block manager needed 7+ seconds to start. But the test expected 2 block managers so it failed.
However, there was no exception in this log file. So I checked a successful build (https://amplab.cs.berkeley.edu/jenkins/job/Spark-Master-SBT/3536/AMPLAB_JENKINS_BUILD_PROFILE=hadoop2.2,label=spark-test/) and it needed 4-5 seconds to set up the local cluster:
```
15/09/16 18:11:27.738 sparkWorker1-akka.actor.default-dispatcher-5 INFO Worker: Running Spark version 1.6.0-SNAPSHOT
...
15/09/16 18:11:30.838 sparkDriver-akka.actor.default-dispatcher-20 INFO BlockManagerMasterEndpoint: Registering block manager localhost:54202 with 530.3 MB RAM, BlockManagerId(1, localhost, 54202)
15/09/16 18:11:32.112 sparkDriver-akka.actor.default-dispatcher-20 INFO BlockManagerMasterEndpoint: Registering block manager localhost:32955 with 530.3 MB RAM, BlockManagerId(0, localhost, 32955)
```
In this build, the first block manager needed only 3+ seconds to start.
Comparing these two builds, I guess it's possible that the local cluster in `BroadcastSuite` cannot be ready in 10 seconds if the Jenkins worker is busy. So I just increased the timeout to 60 seconds to see if this can fix the issue.
Author: zsxwing <zsxwing@gmail.com>
Closes#8813 from zsxwing/fix-BroadcastSuite.
It does not make much sense to set `spark.shuffle.spill` or `spark.sql.planner.externalSort` to false: I believe that these configurations were initially added as "escape hatches" to guard against bugs in the external operators, but these operators are now mature and well-tested. In addition, these configurations are not handled in a consistent way anymore: SQL's Tungsten codepath ignores these configurations and will continue to use spilling operators. Similarly, Spark Core's `tungsten-sort` shuffle manager does not respect `spark.shuffle.spill=false`.
This pull request removes these configurations, adds warnings at the appropriate places, and deletes a large amount of code which was only used in code paths that did not support spilling.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#8831 from JoshRosen/remove-ability-to-disable-spilling.
When `TungstenAggregation` hits memory pressure, it switches from hash-based to sort-based aggregation in-place. However, in the process we try to allocate the pointer array for writing to the new `UnsafeExternalSorter` *before* actually freeing the memory from the hash map. This lead to the following exception:
```
java.io.IOException: Could not acquire 65536 bytes of memory
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.initializeForWriting(UnsafeExternalSorter.java:169)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:220)
at org.apache.spark.sql.execution.UnsafeKVExternalSorter.<init>(UnsafeKVExternalSorter.java:126)
at org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.destructAndCreateExternalSorter(UnsafeFixedWidthAggregationMap.java:257)
at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.switchToSortBasedAggregation(TungstenAggregationIterator.scala:435)
```
Author: Andrew Or <andrew@databricks.com>
Closes#8827 from andrewor14/allocate-pointer-array.
This patch attempts to fix the Hadoop Configuration thread safety issue for NewHadoopRDD in the same way SPARK-2546 fixed the issue for HadoopRDD.
Author: Mingyu Kim <mkim@palantir.com>
Closes#8763 from mingyukim/mkim/SPARK-10611.
When we start HiveThriftServer, we will start SparkContext first, then start HiveServer2, if we kill application while HiveServer2 is starting then SparkContext will stop successfully, but SparkSubmit process can not exit.
Author: linweizhong <linweizhong@huawei.com>
Closes#7853 from Sephiroth-Lin/SPARK-9522.
This pull request is to address the JIRA SPARK-10172 (History Server web UI gets messed up when sorting on any column).
The content of the table gets messed up due to the rowspan attribute of the table data(cell) during sorting.
The current table sort library used in SparkUI (sorttable.js) doesn't support/handle cells(td) with rowspans.
The fix will disable the table sort in the web UI, when there are jobs listed with multiple attempts.
Author: Josiah Samuel <josiah_sams@in.ibm.com>
Closes#8506 from josiahsams/SPARK-10172.
1. Support collecting data of MapType from DataFrame.
2. Support data of MapType in createDataFrame.
Author: Sun Rui <rui.sun@intel.com>
Closes#8711 from sun-rui/SPARK-10050.
Set `X-Frame-Options: SAMEORIGIN` to protect against frame-related vulnerability
Author: Sean Owen <sowen@cloudera.com>
Closes#8745 from srowen/SPARK-10589.
When speculative execution is enabled, consider a scenario where the authorized committer of a particular output partition fails during the OutputCommitter.commitTask() call. In this case, the OutputCommitCoordinator is supposed to release that committer's exclusive lock on committing once that task fails. However, due to a unit mismatch (we used task attempt number in one place and task attempt id in another) the lock will not be released, causing Spark to go into an infinite retry loop.
This bug was masked by the fact that the OutputCommitCoordinator does not have enough end-to-end tests (the current tests use many mocks). Other factors contributing to this bug are the fact that we have many similarly-named identifiers that have different semantics but the same data types (e.g. attemptNumber and taskAttemptId, with inconsistent variable naming which makes them difficult to distinguish).
This patch adds a regression test and fixes this bug by always using task attempt numbers throughout this code.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#8544 from JoshRosen/SPARK-10381.
Remove return statements in RDD.takeSample and wrap it withScope
Author: vinodkc <vinod.kc.in@gmail.com>
Author: vinodkc <vinodkc@users.noreply.github.com>
Author: Vinod K C <vinod.kc@huawei.com>
Closes#8730 from vinodkc/fix_takesample_return.
*Note: this is for master branch only.* The fix for branch-1.5 is at #8721.
The query execution ID is currently passed from a thread to its children, which is not the intended behavior. This led to `IllegalArgumentException: spark.sql.execution.id is already set` when running queries in parallel, e.g.:
```
(1 to 100).par.foreach { _ =>
sc.parallelize(1 to 5).map { i => (i, i) }.toDF("a", "b").count()
}
```
The cause is `SparkContext`'s local properties are inherited by default. This patch adds a way to exclude keys we don't want to be inherited, and makes SQL go through that code path.
Author: Andrew Or <andrew@databricks.com>
Closes#8710 from andrewor14/concurrent-sql-executions.
This patch adds support for submitting map stages in a DAG individually so that we can make downstream decisions after seeing statistics about their output, as part of SPARK-9850. I also added more comments to many of the key classes in DAGScheduler. By itself, the patch is not super useful except maybe to switch between a shuffle and broadcast join, but with the other subtasks of SPARK-9850 we'll be able to do more interesting decisions.
The main entry point is SparkContext.submitMapStage, which lets you run a map stage and see stats about the map output sizes. Other stats could also be collected through accumulators. See AdaptiveSchedulingSuite for a short example.
Author: Matei Zaharia <matei@databricks.com>
Closes#8180 from mateiz/spark-9851.
This is a follow-up patch to #8723. I missed one case there.
Author: Andrew Or <andrew@databricks.com>
Closes#8727 from andrewor14/fix-threading-suite.
Move .java files in `src/main/scala` to `src/main/java` root, except for `package-info.java` (to stay next to package.scala)
Author: Sean Owen <sowen@cloudera.com>
Closes#8736 from srowen/SPARK-10576.
This is a follow-up of https://github.com/apache/spark/pull/8317.
When speculation is enabled, there may be multiply tasks writing to the same path. Generally it's OK as we will write to a temporary directory first and only one task can commit the temporary directory to target path.
However, when we use direct output committer, tasks will write data to target path directly without temporary directory. This causes problems like corrupted data. Please see [PR comment](https://github.com/apache/spark/pull/8191#issuecomment-131598385) for more details.
Unfortunately, we don't have a simple flag to tell if a output committer will write to temporary directory or not, so for safety, we have to disable any customized output committer when `speculation` is true.
Author: Wenchen Fan <cloud0fan@outlook.com>
Closes#8687 from cloud-fan/direct-committer.
This is a followup to #8499 which adds a Scalastyle rule to mandate the use of SparkHadoopUtil's JobContext accessor methods and fixes the existing violations.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#8521 from JoshRosen/SPARK-10330-part2.
Fix a few Java API test style issues: unused generic types, exceptions, wrong assert argument order
Author: Sean Owen <sowen@cloudera.com>
Closes#8706 from srowen/SPARK-10547.
When throwing an IllegalArgumentException in SnappyCompressionCodec.init, chain the existing exception. This allows potentially important debugging info to be passed to the user.
Manual testing shows the exception chained properly, and the test suite still looks fine as well.
This contribution is my original work and I license the work to the project under the project's open source license.
Author: Daniel Imfeld <daniel@danielimfeld.com>
Closes#8725 from dimfeld/dimfeld-patch-1.
This commit ensures if an assertion fails within a thread, it will ultimately fail the test. Otherwise we end up potentially masking real bugs by not propagating assertion failures properly.
Author: Andrew Or <andrew@databricks.com>
Closes#8723 from andrewor14/fix-threading-suite.
See this thread for background:
http://search-hadoop.com/m/q3RTt0rWvIkHAE81
We should check the range of partition Id and provide meaningful message through exception.
Alternatively, we can use abs() and modulo to force the partition Id into legitimate range. However, expectation is that user should correct the logic error in his / her code.
Author: tedyu <yuzhihong@gmail.com>
Closes#8703 from tedyu/master.
ShuffleManager implementations are currently not given type information for
the key, value and combiner classes. Serialization of shuffle objects relies
on objects being JavaSerializable, with methods defined for reading/writing
the object or, alternatively, serialization via Kryo which uses reflection.
Serialization systems like Avro, Thrift and Protobuf generate classes with
zero argument constructors and explicit schema information
(e.g. IndexedRecords in Avro have get, put and getSchema methods).
By serializing the key, value and combiner class names in ShuffleDependency,
shuffle implementations will have access to schema information when
registerShuffle() is called.
Author: Matt Massie <massie@cs.berkeley.edu>
Closes#7403 from massie/shuffle-classtags.
this PR :
1. Enhance reflection in RBackend. Automatically matching a Java array to Scala Seq when finding methods. Util functions like seq(), listToSeq() in R side can be removed, as they will conflict with the Serde logic that transferrs a Scala seq to R side.
2. Enhance the SerDe to support transferring a Scala seq to R side. Data of ArrayType in DataFrame
after collection is observed to be of Scala Seq type.
3. Support ArrayType in createDataFrame().
Author: Sun Rui <rui.sun@intel.com>
Closes#8458 from sun-rui/SPARK-10049.
spark.scheduler.minRegisteredResourcesRatio configuration parameter works for YARN mode but not for Mesos Coarse grained mode.
If the parameter specified default value of 0 will be set for spark.scheduler.minRegisteredResourcesRatio in base class and this method will always return true.
There are no existing test for YARN mode too. Hence not added test for the same.
Author: Akash Mishra <akash.mishra20@gmail.com>
Closes#8672 from SleepyThread/master.
This is a regression introduced in #4960, this commit fixes it and adds a test.
tnachen andrewor14 please review, this should be an easy one.
Author: Iulian Dragos <jaguarul@gmail.com>
Closes#8653 from dragos/issue/mesos/fine-grained-maxExecutorCores.
The architecture is that, in YARN mode, if the driver detects that an executor has disconnected, it asks the ApplicationMaster why the executor died. If the ApplicationMaster is aware that the executor died because of preemption, all tasks associated with that executor are not marked as failed. The executor
is still removed from the driver's list of available executors, however.
There's a few open questions:
1. Should standalone mode have a similar "get executor loss reason" as well? I localized this change as much as possible to affect only YARN, but there could be a valid case to differentiate executor losses in standalone mode as well.
2. I make a pretty strong assumption in YarnAllocator that getExecutorLossReason(executorId) will only be called once per executor id; I do this so that I can remove the metadata from the in-memory map to avoid object accumulation. It's not clear if I'm being overly zealous to save space, however.
cc vanzin specifically for review because it collided with some earlier YARN scheduling work.
cc JoshRosen because it's similar to output commit coordination we did in the past
cc andrewor14 for our discussion on how to get executor exit codes and loss reasons
Author: mcheah <mcheah@palantir.com>
Closes#8007 from mccheah/feature/preemption-handling.
Data Spill with UnsafeRow causes assert failure.
```
java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:165)
at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.writeKey(UnsafeRowSerializer.scala:75)
at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:180)
at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2$$anonfun$apply$1.apply(ExternalSorter.scala:688)
at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2$$anonfun$apply$1.apply(ExternalSorter.scala:687)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:687)
at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:683)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:683)
at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:80)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
```
To reproduce that with code (thanks andrewor14):
```scala
bin/spark-shell --master local
--conf spark.shuffle.memoryFraction=0.005
--conf spark.shuffle.sort.bypassMergeThreshold=0
sc.parallelize(1 to 2 * 1000 * 1000, 10)
.map { i => (i, i) }.toDF("a", "b").groupBy("b").avg().count()
```
Author: Cheng Hao <hao.cheng@intel.com>
Closes#8635 from chenghao-intel/unsafe_spill.
This PR is based on #8383 , thanks to viirya
JIRA: https://issues.apache.org/jira/browse/SPARK-9730
This patch adds the Full Outer Join support for SortMergeJoin. A new class SortMergeFullJoinScanner is added to scan rows from left and right iterators. FullOuterIterator is simply a wrapper of type RowIterator to consume joined rows from SortMergeFullJoinScanner.
Closes#8383
Author: Liang-Chi Hsieh <viirya@appier.com>
Author: Davies Liu <davies@databricks.com>
Closes#8579 from davies/smj_fullouter.
The bulk of the changes are on `transient` annotation on class parameter. Often the compiler doesn't generate a field for this parameters, so the the transient annotation would be unnecessary.
But if the class parameter are used in methods, then fields are created. So it is safer to keep the annotations.
The remainder are some potential bugs, and deprecated syntax.
Author: Luc Bourlier <luc.bourlier@typesafe.com>
Closes#8433 from skyluc/issue/sbt-2.11.
We introduced the Netty network module for shuffle in Spark 1.2, and has turned it on by default for 3 releases. The old ConnectionManager is difficult to maintain. If we merge the patch now, by the time it is released, it would be 1 yr for which ConnectionManager is off by default. It's time to remove it.
Author: Reynold Xin <rxin@databricks.com>
Closes#8161 from rxin/SPARK-9767.
Support running pyspark with cluster mode on Mesos!
This doesn't upload any scripts, so if running in a remote Mesos requires the user to specify the script from a available URI.
Author: Timothy Chen <tnachen@gmail.com>
Closes#8349 from tnachen/mesos_python.
Note: this is not intended to be in Spark 1.5!
This patch rewrites some code in the `DAGScheduler` to make it more readable. In particular
- there were blocks of code that are unnecessary and removed for simplicity
- there were abstractions that are unnecessary and made the code hard to navigate
- other minor changes
Author: Andrew Or <andrew@databricks.com>
Closes#8217 from andrewor14/dag-scheduler-readability and squashes the following commits:
57abca3 [Andrew Or] Move comment back into if case
574fb1e [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-scheduler-readability
64a9ed2 [Andrew Or] Remove unnecessary code + minor code rewrites
It's not supported yet so we should error with a clear message.
Author: Andrew Or <andrew@databricks.com>
Closes#8590 from andrewor14/mesos-cluster-r-guard.
[SPARK-9591](https://issues.apache.org/jira/browse/SPARK-9591)
When we getting the broadcast variable, we can fetch the block form several location,but now when connecting the lost blockmanager(idle for enough time removed by driver when using dynamic resource allocate and so on) will cause task fail,and the worse case will cause the job fail.
Author: jeanlyn <jeanlyn92@gmail.com>
Closes#7927 from jeanlyn/catch_exception.
This contribution is my original work and I license the work to the project under the project's open source license.
Author: Pat Shields <yeoldefortran@gmail.com>
Closes#7979 from pashields/env-loading-on-driver.
Spark gives an error message and does not show the output when a field of the result DataFrame contains characters in CJK.
I changed SerDe.scala in order that Spark support Unicode characters when writes a string to R.
Author: CHOIJAEHONG <redrock07@naver.com>
Closes#7494 from CHOIJAEHONG1/SPARK-8951.
This is pretty minor, just trying to improve the readability of `DAGSchedulerSuite`, I figure every bit helps. Before whenever I read this test, I never knew what "should work" and "should be ignored" really meant -- this adds some asserts & updates comments to make it more clear. Also some reformatting per a suggestion from markhamstra on https://github.com/apache/spark/pull/7699
Author: Imran Rashid <irashid@cloudera.com>
Closes#8434 from squito/SPARK-10247.
Added fetchUpToMaxBytes() to prevent having to update both code blocks when a change is made.
Author: Evan Racah <ejracah@gmail.com>
Closes#8514 from eracah/master.
Added numPartitions(evaluate: Boolean) to RDD. With "evaluate=true" the method is same with "partitions.length". With "evaluate=false", it checks checked-out or already evaluated partitions in the RDD to get number of partition. If it's not those cases, returns -1. RDDInfo.partitionNum calls numPartition only when it's accessed.
Author: navis.ryu <navis@apache.org>
Closes#7127 from navis/SPARK-8707.
The ```Stage``` class now tracks whether there were a sufficient number of consecutive failures of that stage to trigger an abort.
To avoid an infinite loop of stage retries, we abort the job completely after 4 consecutive stage failures for one stage. We still allow more than 4 consecutive stage failures if there is an intervening successful attempt for the stage, so that in very long-lived applications, where a stage may get reused many times, we don't abort the job after failures that have been recovered from successfully.
I've added test cases to exercise the most obvious scenarios.
Author: Ilya Ganelin <ilya.ganelin@capitalone.com>
Closes#5636 from ilganeli/SPARK-5945.
To correctly isolate applications, when requests to read shuffle data
arrive at the shuffle service, proper authorization checks need to
be performed. This change makes sure that only the application that
created the shuffle data can read from it.
Such checks are only enabled when "spark.authenticate" is enabled,
otherwise there's no secure way to make sure that the client is really
who it says it is.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#8218 from vanzin/SPARK-10004.
SPARK-4223.
Currently we support setting view and modify acls but you have to specify a list of users. It would be nice to support * meaning all users have access.
Manual tests to verify that: "*" works for any user in:
a. Spark ui: view and kill stage. Done.
b. Spark history server. Done.
c. Yarn application killing. Done.
Author: zhuol <zhuol@yahoo-inc.com>
Closes#8398 from zhuoliu/4223.
In SMJ, the first ExternalSorter could consume all the memory before spilling, then the second can not even acquire the first page.
Before we have a better memory allocator, SMJ should call prepare() before call any compute() of it's children.
cc rxin JoshRosen
Author: Davies Liu <davies@databricks.com>
Closes#8511 from davies/smj_memory.
JIRA Issue: https://issues.apache.org/jira/browse/SPARK-10184
Change `cumWeight > target` to `cumWeight >= target` in `RangePartitioner.determineBounds` method to make the output partitions more balanced.
Author: ihainan <ihainan72@gmail.com>
Closes#8397 from ihainan/opt_for_rangepartitioner.
This change aims at speeding up the dev cycle a little bit, by making
sure that all tests behave the same w.r.t. where the code to be tested
is loaded from. Namely, that means that tests don't rely on the assembly
anymore, rather loading all needed classes from the build directories.
The main change is to make sure all build directories (classes and test-classes)
are added to the classpath of child processes when running tests.
YarnClusterSuite required some custom code since the executors are run
differently (i.e. not through the launcher library, like standalone and
Mesos do).
I also found a couple of tests that could leak a SparkContext on failure,
and added code to handle those.
With this patch, it's possible to run the following command from a clean
source directory and have all tests pass:
mvn -Pyarn -Phadoop-2.4 -Phive-thriftserver install
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#7629 from vanzin/SPARK-9284.
Remove obsolete warning about dynamic allocation not working with cached RDDs
See discussion in https://issues.apache.org/jira/browse/SPARK-10295
Author: Sean Owen <sowen@cloudera.com>
Closes#8489 from srowen/SPARK-10295.
This PR:
1. supports transferring arbitrary nested array from JVM to R side in SerDe;
2. based on 1, collect() implemenation is improved. Now it can support collecting data of complex types
from a DataFrame.
Author: Sun Rui <rui.sun@intel.com>
Closes#8276 from sun-rui/SPARK-10048.
Replace `JavaConversions` implicits with `JavaConverters`
Most occurrences I've seen so far are necessary conversions; a few have been avoidable. None are in critical code as far as I see, yet.
Author: Sean Owen <sowen@cloudera.com>
Closes#8033 from srowen/SPARK-9613.
The peak execution memory metric was introduced in SPARK-8735. That was before Tungsten was enabled by default, so it assumed that `spark.sql.unsafe.enabled` must be explicitly set to true. The result is that the memory is not displayed by default.
Author: Andrew Or <andrew@databricks.com>
Closes#8345 from andrewor14/show-memory-default.
https://issues.apache.org/jira/browse/SPARK-9439
In general, Yarn apps should be robust to NodeManager restarts. However, if you run spark with the external shuffle service on, after a NM restart all shuffles fail, b/c the shuffle service has lost some state with info on each executor. (Note the shuffle data is perfectly fine on disk across a NM restart, the problem is we've lost the small bit of state that lets us *find* those files.)
The solution proposed here is that the external shuffle service can write out its state to leveldb (backed by a local file) every time an executor is added. When running with yarn, that file is in the NM's local dir. Whenever the service is started, it looks for that file, and if it exists, it reads the file and re-registers all executors there.
Nothing is changed in non-yarn modes with this patch. The service is not given a place to save the state to, so it operates the same as before. This should make it easy to update other cluster managers as well, by just supplying the right file & the equivalent of yarn's `initializeApplication` -- I'm not familiar enough with those modes to know how to do that.
Author: Imran Rashid <irashid@cloudera.com>
Closes#7943 from squito/leveldb_external_shuffle_service_NM_restart and squashes the following commits:
0d285d3 [Imran Rashid] review feedback
70951d6 [Imran Rashid] Merge branch 'master' into leveldb_external_shuffle_service_NM_restart
5c71c8c [Imran Rashid] save executor to db before registering; style
2499c8c [Imran Rashid] explicit dependency on jackson-annotations
795d28f [Imran Rashid] review feedback
81f80e2 [Imran Rashid] Merge branch 'master' into leveldb_external_shuffle_service_NM_restart
594d520 [Imran Rashid] use json to serialize application executor info
1a7980b [Imran Rashid] version
8267d2a [Imran Rashid] style
e9f99e8 [Imran Rashid] cleanup the handling of bad dbs a little
9378ba3 [Imran Rashid] fail gracefully on corrupt leveldb files
acedb62 [Imran Rashid] switch to writing out one record per executor
79922b7 [Imran Rashid] rely on yarn to call stopApplication; assorted cleanup
12b6a35 [Imran Rashid] save registered executors when apps are removed; add tests
c878fbe [Imran Rashid] better explanation of shuffle service port handling
694934c [Imran Rashid] only open leveldb connection once per service
d596410 [Imran Rashid] store executor data in leveldb
59800b7 [Imran Rashid] Files.move in case renaming is unsupported
32fe5ae [Imran Rashid] Merge branch 'master' into external_shuffle_service_NM_restart
d7450f0 [Imran Rashid] style
f729e2b [Imran Rashid] debugging
4492835 [Imran Rashid] lol, dont use a PrintWriter b/c of scalastyle checks
0a39b98 [Imran Rashid] Merge branch 'master' into external_shuffle_service_NM_restart
55f49fc [Imran Rashid] make sure the service doesnt die if the registered executor file is corrupt; add tests
245db19 [Imran Rashid] style
62586a6 [Imran Rashid] just serialize the whole executors map
bdbbf0d [Imran Rashid] comments, remove some unnecessary changes
857331a [Imran Rashid] better tests & comments
bb9d1e6 [Imran Rashid] formatting
bdc4b32 [Imran Rashid] rename
86e0cb9 [Imran Rashid] for tests, shuffle service finds an open port
23994ff [Imran Rashid] style
7504de8 [Imran Rashid] style
a36729c [Imran Rashid] cleanup
efb6195 [Imran Rashid] proper unit test, and no longer leak if apps stop during NM restart
dd93dc0 [Imran Rashid] test for shuffle service w/ NM restarts
d596969 [Imran Rashid] cleanup imports
0e9d69b [Imran Rashid] better names
9eae119 [Imran Rashid] cleanup lots of duplication
1136f44 [Imran Rashid] test needs to have an actual shuffle
0b588bd [Imran Rashid] more fixes ...
ad122ef [Imran Rashid] more fixes
5e5a7c3 [Imran Rashid] fix build
c69f46b [Imran Rashid] maybe working version, needs tests & cleanup ...
bb3ba49 [Imran Rashid] minor cleanup
36127d3 [Imran Rashid] wip
b9d2ced [Imran Rashid] incomplete setup for external shuffle service tests
so constructors parameters and public fields can be annotated. rxin MechCoder
Author: Xiangrui Meng <meng@databricks.com>
Closes#8344 from mengxr/SPARK-10140.2.
Currently the spark applications can be queued to the Mesos cluster dispatcher, but when multiple jobs are in queue we don't handle removing jobs from the buffer correctly while iterating and causes null pointer exception.
This patch copies the buffer before iterating them, so exceptions aren't thrown when the jobs are removed.
Author: Timothy Chen <tnachen@gmail.com>
Closes#8322 from tnachen/fix_cluster_mode.
I added lots of Column functinos into SparkR. And I also added `rand(seed: Int)` and `randn(seed: Int)` in Scala. Since we need such APIs for R integer type.
### JIRA
[[SPARK-9856] Add expression functions into SparkR whose params are complicated - ASF JIRA](https://issues.apache.org/jira/browse/SPARK-9856)
Author: Yu ISHIKAWA <yuu.ishikawa@gmail.com>
Closes#8264 from yu-iskw/SPARK-9856-3.
Add warnings according to SPARK-8949 in `SparkContext`
- warnings in scaladoc
- log warnings when preferred locations feature is used through `SparkContext`'s constructor
However I didn't found any documentation reference of this feature. Please direct me if you know any reference to this feature.
Author: Han JU <ju.han.felix@gmail.com>
Closes#7874 from darkjh/SPARK-8949.
Small changes
- Renamed conf spark.streaming.backpressure.{enable --> enabled}
- Change Java Deprecated annotations to Scala deprecated annotation with more information.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#8299 from tdas/SPARK-9967.
In Scala, `Seq.fill` always seems to return a List. Accessing a list by index is an O(N) operation. Thus, the following code will be really slow (~10 seconds on my machine):
```scala
val numItems = 100000
val s = Seq.fill(numItems)(1)
for (i <- 0 until numItems) s(i)
```
It turns out that we had a loop like this in DAGScheduler code, although it's a little tricky to spot. In `getPreferredLocsInternal`, there's a call to `getCacheLocs(rdd)(partition)`. The `getCacheLocs` call returns a Seq. If this Seq is a List and the RDD contains many partitions, then indexing into this list will cost O(partitions). Thus, when we loop over our tasks to compute their individual preferred locations we implicitly perform an N^2 loop, reducing scheduling throughput.
This patch fixes this by replacing `Seq` with `Array`.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#8178 from JoshRosen/dagscheduler-perf.
The fix for SPARK-7736 introduced a race where a port value of "-1"
could be passed down to the pyspark process, causing it to fail to
connect back to the JVM. This change adds code to fix that race.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#8258 from vanzin/SPARK-7736.
it might be a typo introduced at the first moment or some leftover after some renaming......
the name of the method accessing the index file is called `getBlockData` now (not `getBlockLocation` as indicated in the comments)
Author: CodingCat <zhunansjtu@gmail.com>
Closes#8238 from CodingCat/minor_1.
The YARN backend doesn't like when user code calls `System.exit`,
since it cannot know the exit status and thus cannot set an
appropriate final status for the application.
So, for pyspark, avoid that call and instead throw an exception with
the exit code. SparkSubmit handles that exception and exits with
the given exit code, while YARN uses the exit code as the failure
code for the Spark app.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#7751 from vanzin/SPARK-9416.
The shuffle locality patch made the DAGScheduler aware of shuffle data,
but for RDDs that have both narrow and shuffle dependencies, it can
cause them to place tasks based on the shuffle dependency instead of the
narrow one. This case is common in iterative join-based algorithms like
PageRank and ALS, where one RDD is hash-partitioned and one isn't.
Author: Matei Zaharia <matei@databricks.com>
Closes#8220 from mateiz/shuffle-loc-fix.
Tiny modification to a few comments ```sbt publishLocal``` work again.
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#8209 from hvanhovell/SPARK-9980.
Deprecate NIO ConnectionManager in Spark 1.5.0, before removing it in Spark 1.6.0.
Author: Reynold Xin <rxin@databricks.com>
Closes#8162 from rxin/SPARK-9934.
Detailed exception log can be seen in [SPARK-9877](https://issues.apache.org/jira/browse/SPARK-9877), the problem is when creating `StandaloneRestServer`, `self` (`masterEndpoint`) is null. So this fix is creating `StandaloneRestServer` when `self` is available.
Author: jerryshao <sshao@hortonworks.com>
Closes#8127 from jerryshao/SPARK-9877.
In these tests, we use a custom listener and we assert on fields in the stage / task completion events. However, these events are posted in a separate thread so they're not guaranteed to be posted in time. This commit fixes this flakiness through a job end registration callback.
Author: Andrew Or <andrew@databricks.com>
Closes#8176 from andrewor14/fix-accumulator-suite.
When a stage failed and another stage was resubmitted with only part of partitions to compute, all the tasks failed with error message: java.util.NoSuchElementException: key not found: peakExecutionMemory.
This is because the internal accumulators are not properly initialized for this stage while other codes assume the internal accumulators always exist.
Author: Carson Wang <carson.wang@intel.com>
Closes#8090 from carsonwang/SPARK-9809.
Modified type of ShuffleMapStage.numAvailableOutputs from Long to Int
Author: Neelesh Srinivas Salian <nsalian@cloudera.com>
Closes#8183 from nssalian/SPARK-9923.
Currently, pageSize of TungstenSort is calculated from driver.memory, it should use executor.memory instead.
Also, in the worst case, the safeFactor could be 4 (because of rounding), increase it to 16.
cc rxin
Author: Davies Liu <davies@databricks.com>
Closes#8175 from davies/page_size.
This patch add a thread-safe lookup for BytesToBytseMap, and use that in broadcasted HashedRelation.
Author: Davies Liu <davies@databricks.com>
Closes#8151 from davies/safeLookup.
I think that we should pass additional configuration flags to disable the driver UI and Master REST server in SparkSubmitSuite and HiveSparkSubmitSuite. This might cut down on port-contention-related flakiness in Jenkins.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#8124 from JoshRosen/disable-ui-in-sparksubmitsuite.
Refactor Utils class and create ShutdownHookManager.
NOTE: Wasn't able to run /dev/run-tests on windows machine.
Manual tests were conducted locally using custom log4j.properties file with Redis appender and logstash formatter (bundled in the fat-jar submitted to spark)
ex:
log4j.rootCategory=WARN,console,redis
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.spark.graphx.Pregel=INFO
log4j.appender.redis=com.ryantenney.log4j.FailoverRedisAppender
log4j.appender.redis.endpoints=hostname:port
log4j.appender.redis.key=mykey
log4j.appender.redis.alwaysBatch=false
log4j.appender.redis.layout=net.logstash.log4j.JSONEventLayoutV1
Author: michellemay <mlemay@gmail.com>
Closes#8109 from michellemay/SPARK-9826.
… allocation are set. Now, dynamic allocation is set to false when num-executors is explicitly specified as an argument. Consequently, executorAllocationManager in not initialized in the SparkContext.
Author: Niranjan Padmanabhan <niranjan.padmanabhan@cloudera.com>
Closes#7657 from neurons/SPARK-9092.
This is the sister patch to #8011, but for aggregation.
In a nutshell: create the `TungstenAggregationIterator` before computing the parent partition. Internally this creates a `BytesToBytesMap` which acquires a page in the constructor as of this patch. This ensures that the aggregation operator is not starved since we reserve at least 1 page in advance.
rxin yhuai
Author: Andrew Or <andrew@databricks.com>
Closes#8038 from andrewor14/unsafe-starve-memory-agg.
This is based on KaiXinXiaoLei's changes in #7716.
The issue is that when someone calls `sc.killExecutor("1")` on the same executor twice quickly, then the executor target will be adjusted downwards by 2 instead of 1 even though we're only actually killing one executor. In certain cases where we don't adjust the target back upwards quickly, we'll end up with jobs hanging.
This is a common danger because there are many places where this is called:
- `HeartbeatReceiver` kills an executor that has not been sending heartbeats
- `ExecutorAllocationManager` kills an executor that has been idle
- The user code might call this, which may interfere with the previous callers
While it's not clear whether this fixes SPARK-9745, fixing this potential race condition seems like a strict improvement. I've added a regression test to illustrate the issue.
Author: Andrew Or <andrew@databricks.com>
Closes#8078 from andrewor14/da-double-kill.
This allows clients to retrieve the original exception from the
cause field of the SparkException that is thrown by the driver.
If the original exception is not in fact Serializable then it will
not be returned, but the message and stacktrace will be. (All Java
Throwables implement the Serializable interface, but this is no
guarantee that a particular implementation can actually be
serialized.)
Author: Tom White <tom@cloudera.com>
Closes#7014 from tomwhite/propagate-user-exceptions.
Some users like to download additional files in their sandbox that they can refer to from their spark program, or even later mount these files to another directory.
Author: Timothy Chen <tnachen@gmail.com>
Closes#7195 from tnachen/mesos_files.
To reproduce the issue, go to the stage page and click DAG Visualization once, then go to the job page to show the job DAG visualization. You will only see the first stage of the job.
Root cause: the java script use local storage to remember your selection. Once you click the stage DAG visualization, the local storage set `expand-dag-viz-arrow-stage` to true. When you go to the job page, the js checks `expand-dag-viz-arrow-stage` in the local storage first and will try to show stage DAG visualization on the job page.
To fix this, I set an id to the DAG span to differ job page and stage page. In the js code, we check the id and local storage together to make sure we show the correct DAG visualization.
Author: Carson Wang <carson.wang@intel.com>
Closes#8104 from carsonwang/SPARK-9426.
The peak execution memory is not correct because it shows the sum of finished tasks' values when a task finishes.
This PR fixes it by using the update value rather than the accumulator value.
Author: zsxwing <zsxwing@gmail.com>
Closes#8121 from zsxwing/SPARK-9829.
`InternalAccumulator.create` doesn't call `registerAccumulatorForCleanup` to register itself with ContextCleaner, so `WeakReference`s for these accumulators in `Accumulators.originals` won't be removed.
This PR added `registerAccumulatorForCleanup` for internal accumulators to avoid the memory leak.
Author: zsxwing <zsxwing@gmail.com>
Closes#8108 from zsxwing/internal-accumulators-leak.
PlatformDependent.UNSAFE is way too verbose.
Author: Reynold Xin <rxin@databricks.com>
Closes#8094 from rxin/SPARK-9815 and squashes the following commits:
229b603 [Reynold Xin] [SPARK-9815] Rename PlatformDependent.UNSAFE -> Platform.
RUtils.isRInstalled throws an exception if R is not installed,
instead of returning false. Fix that.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#8008 from vanzin/SPARK-9710 and squashes the following commits:
df72d8c [Marcelo Vanzin] [SPARK-9710] [test] Fix RPackageUtilsSuite when R is not available.
This was introduced in #7599
cc rxin brkyvz
Author: Shivaram Venkataraman <shivaram@cs.berkeley.edu>
Closes#8055 from shivaram/spark-packages-repo-fix and squashes the following commits:
890f306 [Shivaram Venkataraman] Remove test case
51d69ee [Shivaram Venkataraman] Add test case for --packages without --repository
c02e0b4 [Shivaram Venkataraman] Use Option instead of Some for Ivy repos
In order for this to work, I had to disable gap sampling.
Author: Reynold Xin <rxin@databricks.com>
Closes#8040 from rxin/SPARK-9752 and squashes the following commits:
f9e248c [Reynold Xin] Fix the test case for real this time.
adbccb3 [Reynold Xin] Fixed test case.
589fb23 [Reynold Xin] Merge branch 'SPARK-9752' of github.com:rxin/spark into SPARK-9752
55ccddc [Reynold Xin] Fixed core test.
78fa895 [Reynold Xin] [SPARK-9752][SQL] Support UnsafeRow in Sample operator.
c9e7112 [Reynold Xin] [SPARK-9752][SQL] Support UnsafeRow in Sample operator.
The issue only happens if `spark.executor.cores` is not set and executor memory is set to a high value.
For example, if we have a worker with 4G and 10 cores and we set `spark.executor.memory` to 3G, then only 1 core is assigned to the executor. The correct number should be 10 cores.
I've added a unit test to illustrate the issue.
Author: Carson Wang <carson.wang@intel.com>
Closes#8017 from carsonwang/SPARK-9731 and squashes the following commits:
d09ec48 [Carson Wang] Fix code style
86b651f [Carson Wang] Simplify the code
943cc4c [Carson Wang] fix scheduling correct cores to executors
The original code that this test tests is removed in 9270bd06fd. It was ignored shortly before that so we never caught it. This patch re-enables the test and adds the code necessary to make it pass.
JoshRosen yhuai
Author: Andrew Or <andrew@databricks.com>
Closes#8015 from andrewor14/SPARK-9674 and squashes the following commits:
225eac2 [Andrew Or] Merge branch 'master' of github.com:apache/spark into SPARK-9674
8c24209 [Andrew Or] Fix NPE
e541d64 [Andrew Or] Track aggregation memory for both sort and hash
0be3a42 [Andrew Or] Fix test
This PR adds SQLMetric/SQLMetricParam/SQLMetricValue to specialize accumulators to avoid boxing. All SQL metrics should use these classes rather than `Accumulator`.
Author: zsxwing <zsxwing@gmail.com>
Closes#7996 from zsxwing/sql-accu and squashes the following commits:
14a5f0a [zsxwing] Address comments
367ca23 [zsxwing] Use localValue directly to avoid changing Accumulable
42f50c3 [zsxwing] Add SQLMetric to specialize accumulators to avoid boxing
This patch follows exactly #7891 (except testing)
Author: Davies Liu <davies@databricks.com>
Closes#8005 from davies/larger_record and squashes the following commits:
f9c4aff [Davies Liu] address comments
9de5c72 [Davies Liu] support records larger than page size in UnsafeShuffleExternalSorter
Previously, we use 64MB as the default page size, which was way too big for a lot of Spark applications (especially for single node).
This patch changes it so that the default page size, if unset by the user, is determined by the number of cores available and the total execution memory available.
Author: Reynold Xin <rxin@databricks.com>
Closes#8012 from rxin/pagesize and squashes the following commits:
16f4756 [Reynold Xin] Fixed failing test.
5afd570 [Reynold Xin] private...
0d5fb98 [Reynold Xin] Update default value.
674a6cd [Reynold Xin] Address review feedback.
dc00e05 [Reynold Xin] Merge with master.
73ebdb6 [Reynold Xin] [SPARK-9700] Pick default page size more intelligently.
Someone may use the Spark core jar in the maven repo with hadoop 1. SPARK-2075 has already resolved the compatibility issue to support it. But `SparkHadoopMapRedUtil.commitTask` broke it recently.
This PR uses Reflection to call `TaskAttemptContext.getTaskAttemptID` to fix the compatibility issue.
Author: zsxwing <zsxwing@gmail.com>
Closes#6599 from zsxwing/SPARK-8057 and squashes the following commits:
f7a343c [zsxwing] Remove the redundant import
6b7f1af [zsxwing] Call TaskAttemptContext.getTaskAttemptID using Reflection
The issue is that a task may run multiple sorts, and the sorts run by the child operator (i.e. parent RDD) may acquire all available memory such that other sorts in the same task do not have enough to proceed. This manifests itself in an `IOException("Unable to acquire X bytes of memory")` thrown by `UnsafeExternalSorter`.
The solution is to reserve a page in each sorter in the chain before computing the child operator's (parent RDD's) partitions. This requires us to use a new special RDD that does some preparation before computing the parent's partitions.
Author: Andrew Or <andrew@databricks.com>
Closes#8011 from andrewor14/unsafe-starve-memory and squashes the following commits:
35b69a4 [Andrew Or] Simplify test
0b07782 [Andrew Or] Minor: update comments
5d5afdf [Andrew Or] Merge branch 'master' of github.com:apache/spark into unsafe-starve-memory
254032e [Andrew Or] Add tests
234acbd [Andrew Or] Reserve a page in sorter when preparing each partition
b889e08 [Andrew Or] MapPartitionsWithPreparationRDD
A small performance optimization – we don't need to generate a Tuple2 and then immediately discard the key. We also don't need an extra wrapper from InterruptibleIterator.
Author: Reynold Xin <rxin@databricks.com>
Closes#8000 from rxin/SPARK-9692 and squashes the following commits:
1d4d0b3 [Reynold Xin] [SPARK-9692] Remove SqlNewHadoopRDD's generated Tuple2 and InterruptibleIterator.
Spark should not mess with the permissions of directories created
by the cluster manager. Here, by setting the block manager dir
permissions to 700, the shuffle service (running as the YARN user)
wouldn't be able to serve shuffle files created by applications.
Also, the code to protect the local app dir was missing in standalone's
Worker; that has been now added. Since all processes run as the same
user in standalone, `chmod 700` should not cause problems.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#7966 from vanzin/SPARK-9645 and squashes the following commits:
6e07b31 [Marcelo Vanzin] Protect the app dir in standalone mode.
384ba6a [Marcelo Vanzin] [SPARK-9645] [yarn] [core] Allow shuffle service to read shuffle files.
In some receivers, instead of using the default `BlockGenerator` in `ReceiverSupervisorImpl`, custom generator with their custom listeners are used for reliability (see [`ReliableKafkaReceiver`](https://github.com/apache/spark/blob/master/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala#L99) and [updated `KinesisReceiver`](https://github.com/apache/spark/pull/7825/files)). These custom generators do not receive rate updates. This PR modifies the code to allow custom `BlockGenerator`s to be created through the `ReceiverSupervisorImpl` so that they can be kept track and rate updates can be applied.
In the process, I did some simplification, and de-flaki-fication of some rate controller related tests. In particular.
- Renamed `Receiver.executor` to `Receiver.supervisor` (to match `ReceiverSupervisor`)
- Made `RateControllerSuite` faster (by increasing batch interval) and less flaky
- Changed a few internal API to return the current rate of block generators as Long instead of Option\[Long\] (was inconsistent at places).
- Updated existing `ReceiverTrackerSuite` to test that custom block generators get rate updates as well.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#7913 from tdas/SPARK-9556 and squashes the following commits:
41d4461 [Tathagata Das] fix scala style
eb9fd59 [Tathagata Das] Updated kinesis receiver
d24994d [Tathagata Das] Updated BlockGeneratorSuite to use manual clock in BlockGenerator
d70608b [Tathagata Das] Updated BlockGenerator with states and proper synchronization
f6bd47e [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-9556
31da173 [Tathagata Das] Fix bug
12116df [Tathagata Das] Add BlockGeneratorSuite
74bd069 [Tathagata Das] Fix style
989bb5c [Tathagata Das] Made BlockGenerator fail is used after stop, and added better unit tests for it
3ff618c [Tathagata Das] Fix test
b40eff8 [Tathagata Das] slight refactoring
f0df0f1 [Tathagata Das] Scala style fixes
51759cb [Tathagata Das] Refactored rate controller tests and added the ability to update rate of any custom block generator
This pull request adds a destructive iterator to BytesToBytesMap. When used, the iterator frees pages as it traverses them. This is part of the effort to avoid starving when we have more than one operators that can exhaust memory.
This is based on #7924, but fixes a bug there (Don't use destructive iterator in UnsafeKVExternalSorter).
Closes#7924.
Author: Liang-Chi Hsieh <viirya@appier.com>
Author: Reynold Xin <rxin@databricks.com>
Closes#8003 from rxin/map-destructive-iterator and squashes the following commits:
6b618c3 [Reynold Xin] Don't use destructive iterator in UnsafeKVExternalSorter.
a7bd8ec [Reynold Xin] Merge remote-tracking branch 'viirya/destructive_iter' into map-destructive-iterator
7652083 [Liang-Chi Hsieh] For comments: add destructiveIterator(), modify unit test, remove code block.
4a3e9de [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into destructive_iter
581e9e3 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into destructive_iter
f0ff783 [Liang-Chi Hsieh] No need to free last page.
9e9d2a3 [Liang-Chi Hsieh] Add a destructive iterator for BytesToBytesMap.
This PR has the following three small fixes.
1. UnsafeKVExternalSorter does not use 0 as the initialSize to create an UnsafeInMemorySorter if its BytesToBytesMap is empty.
2. We will not not spill a InMemorySorter if it is empty.
3. We will not add a SpillReader to a SpillMerger if this SpillReader is empty.
JIRA: https://issues.apache.org/jira/browse/SPARK-9611
Author: Yin Huai <yhuai@databricks.com>
Closes#7948 from yhuai/unsafeEmptyMap and squashes the following commits:
9727abe [Yin Huai] Address Josh's comments.
34b6f76 [Yin Huai] 1. UnsafeKVExternalSorter does not use 0 as the initialSize to create an UnsafeInMemorySorter if its BytesToBytesMap is empty. 2. Do not spill a InMemorySorter if it is empty. 3. Do not add spill to SpillMerger if this spill is empty.
First, it's probably a bad idea to call generated Scala methods
from Java. In this case, the method being called wasn't actually
"Utils.createTempDir()", but actually the method that returns the
first default argument to the actual createTempDir method, which
is just the location of java.io.tmpdir; meaning that all tests in
the class were using the same temp dir, and thus affecting each
other.
Second, spillingOccursInResponseToMemoryPressure was not writing
enough records to actually cause a spill.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#7970 from vanzin/SPARK-9651 and squashes the following commits:
74d357f [Marcelo Vanzin] Clean up temp dir on test tear down.
a64f36a [Marcelo Vanzin] [SPARK-9651] Fix UnsafeExternalSorterSuite.
```
Error Message
Failed to bind to: /127.0.0.1:7093: Service 'sparkMaster' failed after 16 retries!
Stacktrace
java.net.BindException: Failed to bind to: /127.0.0.1:7093: Service 'sparkMaster' failed after 16 retries!
at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272)
at akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:393)
at akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:389)
at scala.util.Success$$anonfun$map$1.apply(Try.scala:206)
at scala.util.Try$.apply(Try.scala:161)
```
Author: Andrew Or <andrew@databricks.com>
Closes#7968 from andrewor14/fix-master-flaky-test and squashes the following commits:
fcc42ef [Andrew Or] Randomize port
This PR includes the following changes:
### SPARK-8862: Add basic instrumentation to each SparkPlan operator
A SparkPlan can override `def accumulators: Map[String, Accumulator[_]]` to expose its metrics that can be displayed in UI. The UI will use them to track the updates and show them in the web page in real-time.
### SparkSQLExecution and SQLSparkListener
`SparkSQLExecution.withNewExecutionId` will set `spark.sql.execution.id` to the local properties so that we can use it to track all jobs that belong to the same query.
SQLSparkListener is a listener to track all accumulator updates of all tasks for a query. It receives them from heartbeats can the UI can query them in real-time.
When running a query, `SQLSparkListener.onExecutionStart` will be called. When a query is finished, `SQLSparkListener.onExecutionEnd` will be called. And the Spark jobs with the same execution id will be tracked and stored with this query.
`SQLSparkListener` has to store all accumulator updates for tasks separately. When a task fails and starts to retry, we need to drop the old accumulator updates. Because we can not revert our changes to an accumulator, we have to maintain these accumulator updates by ourselves so as to drop accumulator updates for a failed task.
### SPARK-8862: A new SQL tab
Includes two pages:
#### A page for all DataFrame/SQL queries
It will show the running, completed and failed queries in 3 tables. It also displays the jobs and their links for a query in each row.
#### A detail page for a DataFrame/SQL query
In this page, it also shows the SparkPlan metrics in real-time. Run a long-running query, such as
```
val testData = sc.parallelize((1 to 1000000).map(i => (i, i.toString))).toDF()
testData.select($"_1").filter($"_1" < 1000).foreach(_ => Thread.sleep(60))
```
and you will see the metrics keep updating in real-time.
<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/7774)
<!-- Reviewable:end -->
Author: zsxwing <zsxwing@gmail.com>
Closes#7774 from zsxwing/sql-ui and squashes the following commits:
5a2bc99 [zsxwing] Remove UISeleniumSuite and its dependency
57d4cd2 [zsxwing] Use VisibleForTesting annotation
cc1c736 [zsxwing] Add SparkPlan.trackNumOfRowsEnabled to make subclasses easy to track the number of rows; fix the issue that the "save" action cannot collect metrics
3771ab0 [zsxwing] Register SQL metrics accmulators
3a101c0 [zsxwing] Change prepareCalled's type to AtomicBoolean for thread-safety
b8d5605 [zsxwing] Make prepare idempotent; call children's prepare in SparkPlan.prepare; change doPrepare to def
4ed11a1 [zsxwing] var -> val
332639c [zsxwing] Ignore UISeleniumSuite and SQLListenerSuite."no memory leak" because of SPARK-9580
bb52359 [zsxwing] Address other commens in SQLListener
c4d0f5d [zsxwing] Move newPredicate out of the iterator loop
957473c [zsxwing] Move STATIC_RESOURCE_DIR to object SQLTab
7ab4816 [zsxwing] Make SparkPlan accumulator API private[sql]
dae195e [zsxwing] Fix the code style and comments
3a66207 [zsxwing] Ignore irrelevant accumulators
b8484a1 [zsxwing] Merge branch 'master' into sql-ui
9406592 [zsxwing] Implement the SparkPlan viz
4ebce68 [zsxwing] Add SparkPlan.prepare to support BroadcastHashJoin to run background work in parallel
ca1811f [zsxwing] Merge branch 'master' into sql-ui
fef6fc6 [zsxwing] Fix a corner case
25f335c [zsxwing] Fix the code style
6eae828 [zsxwing] SQLSparkListener -> SQLListener; SparkSQLExecutionUIData -> SQLExecutionUIData; SparkSQLExecution -> SQLExecution
822af75 [zsxwing] Add SQLSparkListenerSuite and fix the issue about onExecutionEnd and onJobEnd
6be626f [zsxwing] Add UISeleniumSuite to test UI
d02a24d [zsxwing] Make ExecutionPage private
23abf73 [zsxwing] [SPARK-8862][SPARK-8862][SQL] Add basic instrumentation to each SparkPlan operator and add a new SQL tab
The current implementation of UnsafeExternalSort uses NoOpPrefixComparator for binary-typed data.
So, we need to add BinaryPrefixComparator in PrefixComparators.
Author: Takeshi YAMAMURO <linguin.m.s@gmail.com>
Closes#7676 from maropu/BinaryTypePrefixComparator and squashes the following commits:
fe6f31b [Takeshi YAMAMURO] Apply comments
d943c04 [Takeshi YAMAMURO] Add a codegen'd entry for BinaryType in SortPrefix
ecf3ac5 [Takeshi YAMAMURO] Support BinaryType in PrefixComparator
shivaram cafreeman Could you please help me in testing this out? Exposing and running `rPackageBuilder` from inside the shell works, but for some reason, I can't get it to work during Spark Submit. It just starts relaunching Spark Submit.
For testing, you may use the R branch with [sbt-spark-package](https://github.com/databricks/sbt-spark-package). You can call spPackage, and then pass the jar using `--jars`.
Author: Burak Yavuz <brkyvz@gmail.com>
Closes#7139 from brkyvz/r-submit and squashes the following commits:
0de384f [Burak Yavuz] remove unused imports 2
d253708 [Burak Yavuz] removed unused imports
6603d0d [Burak Yavuz] addressed comments
4258ffe [Burak Yavuz] merged master
ddfcc06 [Burak Yavuz] added zipping test
3a1be7d [Burak Yavuz] don't zip
77995df [Burak Yavuz] fix URI
ac45527 [Burak Yavuz] added zipping of all libs
e6bf7b0 [Burak Yavuz] add println ignores
1bc5554 [Burak Yavuz] add assumes for tests
9778e03 [Burak Yavuz] addressed comments
b42b300 [Burak Yavuz] merged master
ffd134e [Burak Yavuz] Merge branch 'master' of github.com:apache/spark into r-submit
d867756 [Burak Yavuz] add apache header
eff5ba1 [Burak Yavuz] ready for review
8838edb [Burak Yavuz] Merge branch 'master' of github.com:apache/spark into r-submit
e5b5a06 [Burak Yavuz] added doc
bb751ce [Burak Yavuz] fix null bug
0226768 [Burak Yavuz] fixed issues
8810beb [Burak Yavuz] R packages support
https://issues.apache.org/jira/browse/SPARK-9602
Although we have hidden Akka behind RPC interface, I found that the Akka/Actor-related comments are still spreading everywhere. To make it consistent, we shall remove "actor"/"akka" words from the comments...
Author: CodingCat <zhunansjtu@gmail.com>
Closes#7936 from CodingCat/SPARK-9602 and squashes the following commits:
e8296a3 [CodingCat] remove actor words from comments
This patch extends UnsafeExternalSorter to support records larger than the page size. The basic strategy is the same as in #7762: store large records in their own overflow pages.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#7891 from JoshRosen/large-records-in-sql-sorter and squashes the following commits:
967580b [Josh Rosen] Merge remote-tracking branch 'origin/master' into large-records-in-sql-sorter
948c344 [Josh Rosen] Add large records tests for KV sorter.
3c17288 [Josh Rosen] Combine memory and disk cleanup into general cleanupResources() method
380f217 [Josh Rosen] Merge remote-tracking branch 'origin/master' into large-records-in-sql-sorter
27eafa0 [Josh Rosen] Fix page size in PackedRecordPointerSuite
a49baef [Josh Rosen] Address initial round of review comments
3edb931 [Josh Rosen] Remove accidentally-committed debug statements.
2b164e2 [Josh Rosen] Support large records in UnsafeExternalSorter.
Enable most javac lint warnings; fix a lot of build warnings. In a few cases, touch up surrounding code in the process.
I'll explain several of the changes inline in comments.
Author: Sean Owen <sowen@cloudera.com>
Closes#7862 from srowen/SPARK-9534 and squashes the following commits:
ea51618 [Sean Owen] Enable most javac lint warnings; fix a lot of build warnings. In a few cases, touch up surrounding code in the process.
We often return abstract iterator types in various sort-related classes (e.g. UnsafeKVExternalSorter). It is actually better to return a more concrete type, so the callsite uses that type and JIT can inline the iterator calls.
Author: Reynold Xin <rxin@databricks.com>
Closes#7911 from rxin/surface-concrete-type and squashes the following commits:
0422add [Reynold Xin] [SPARK-9577][SQL] Surface concrete iterator types in various sort classes.
https://issues.apache.org/jira/browse/SPARK-8416
To facilitate debugging, I made this patch with three changes:
* render the executor-thread and non executor-thread entries with different background colors
* put the executor threads on the top of the list
* sort the threads alphabetically
Author: CodingCat <zhunansjtu@gmail.com>
Closes#7808 from CodingCat/SPARK-8416 and squashes the following commits:
34fc708 [CodingCat] fix className
d7b79dd [CodingCat] lowercase threadName
d032882 [CodingCat] sort alphabetically and change the css class name
f0513b1 [CodingCat] change the color & group threads by name
2da6e06 [CodingCat] small fix
3fc9f36 [CodingCat] define classes in webui.css
8ee125e [CodingCat] highlight and put on top the executor threads in thread dumping page
While the functionality is there to exclude packages, there are no flags that allow users to exclude dependencies, in case of dependency conflicts. We should provide users with a flag to add dependency exclusions in case the packages are not resolved properly (or not available due to licensing).
The flag I added was --packages-exclude, but I'm open on renaming it. I also added property flags in case people would like to use a conf file to provide dependencies, which is possible if there is a long list of dependencies or exclusions.
cc andrewor14 vanzin pwendell
Author: Burak Yavuz <brkyvz@gmail.com>
Closes#7599 from brkyvz/packages-exclusions and squashes the following commits:
636f410 [Burak Yavuz] addressed nits
6e54ede [Burak Yavuz] is this the culprit
b5e508e [Burak Yavuz] Merge branch 'master' of github.com:apache/spark into packages-exclusions
154f5db [Burak Yavuz] addressed initial comments
1536d7a [Burak Yavuz] Added flags to exclude packages using --packages-exclude
Certain use cases of Spark involve RDDs with long lineages that must be truncated periodically (e.g. GraphX). The existing way of doing it is through `rdd.checkpoint()`, which is expensive because it writes to HDFS. This patch provides an alternative to truncate lineages cheaply *without providing the same level of fault tolerance*.
**Local checkpointing** writes checkpointed data to the local file system through the block manager. It is much faster than replicating to a reliable storage and provides the same semantics as long as executors do not fail. It is accessible through a new operator `rdd.localCheckpoint()` and leaves the old one unchanged. Users may even decide to combine the two and call the reliable one less frequently.
The bulk of this patch involves refactoring the checkpointing interface to accept custom implementations of checkpointing. [Design doc](https://issues.apache.org/jira/secure/attachment/12741708/SPARK-7292-design.pdf).
Author: Andrew Or <andrew@databricks.com>
Closes#7279 from andrewor14/local-checkpoint and squashes the following commits:
729600f [Andrew Or] Oops, fix tests
34bc059 [Andrew Or] Avoid computing all partitions in local checkpoint
e43bbb6 [Andrew Or] Merge branch 'master' of github.com:apache/spark into local-checkpoint
3be5aea [Andrew Or] Address comments
bf846a6 [Andrew Or] Merge branch 'master' of github.com:apache/spark into local-checkpoint
ab003a3 [Andrew Or] Fix compile
c2e111b [Andrew Or] Address comments
33f167a [Andrew Or] Merge branch 'master' of github.com:apache/spark into local-checkpoint
e908a42 [Andrew Or] Fix tests
f5be0f3 [Andrew Or] Use MEMORY_AND_DISK as the default local checkpoint level
a92657d [Andrew Or] Update a few comments
e58e3e3 [Andrew Or] Merge branch 'master' of github.com:apache/spark into local-checkpoint
4eb6eb1 [Andrew Or] Merge branch 'master' of github.com:apache/spark into local-checkpoint
1bbe154 [Andrew Or] Simplify LocalCheckpointRDD
48a9996 [Andrew Or] Avoid traversing dependency tree + rewrite tests
62aba3f [Andrew Or] Merge branch 'master' of github.com:apache/spark into local-checkpoint
db70dc2 [Andrew Or] Express local checkpointing through caching the original RDD
87d43c6 [Andrew Or] Merge branch 'master' of github.com:apache/spark into local-checkpoint
c449b38 [Andrew Or] Fix style
4a182f3 [Andrew Or] Add fine-grained tests for local checkpointing
53b363b [Andrew Or] Rename a few more awkwardly named methods (minor)
e4cf071 [Andrew Or] Simplify LocalCheckpointRDD + docs + clean ups
4880deb [Andrew Or] Fix style
d096c67 [Andrew Or] Fix mima
172cb66 [Andrew Or] Fix mima?
e53d964 [Andrew Or] Fix style
56831c5 [Andrew Or] Add a few warnings and clear exception messages
2e59646 [Andrew Or] Add local checkpoint clean up tests
4dbbab1 [Andrew Or] Refactor CheckpointSuite to test local checkpointing
4514dc9 [Andrew Or] Clean local checkpoint files through RDD cleanups
0477eec [Andrew Or] Rename a few methods with awkward names (minor)
2e902e5 [Andrew Or] First implementation of local checkpointing
8447454 [Andrew Or] Fix tests
4ac1896 [Andrew Or] Refactor checkpoint interface for modularity
This patch builds directly on #7820, which is largely written by tnachen. The only addition is one commit for cleaning up the code. There should be no functional differences between this and #7820.
Author: Timothy Chen <tnachen@gmail.com>
Author: Andrew Or <andrew@databricks.com>
Closes#7881 from andrewor14/tim-cleanup-mesos-shuffle and squashes the following commits:
8894f7d [Andrew Or] Clean up code
2a5fa10 [Andrew Or] Merge branch 'mesos_shuffle_clean' of github.com:tnachen/spark into tim-cleanup-mesos-shuffle
fadff89 [Timothy Chen] Address comments.
e4d0f1d [Timothy Chen] Clean up external shuffle data on driver exit with Mesos.
This pull request adds a destructAndCreateExternalSorter method to UnsafeFixedWidthAggregationMap. The new method does the following:
1. Creates a new external sorter UnsafeKVExternalSorter
2. Adds all the data into an in-memory sorter, sorts them
3. Spills the sorted in-memory data to disk
This method can be used to fallback to sort-based aggregation when under memory pressure.
The pull request also includes accounting fixes from JoshRosen.
TODOs (that can be done in follow-up PRs)
- [x] Address Josh's feedbacks from #7849
- [x] More documentation and test cases
- [x] Make sure we are doing memory accounting correctly with test cases (e.g. did we release the memory in BytesToBytesMap twice?)
- [ ] Look harder at possible memory leaks and exception handling
- [ ] Randomized tester for the KV sorter as well as the aggregation map
Author: Reynold Xin <rxin@databricks.com>
Author: Josh Rosen <joshrosen@databricks.com>
Closes#7860 from rxin/kvsorter and squashes the following commits:
986a58c [Reynold Xin] Bug fix.
599317c [Reynold Xin] Style fix and slightly more compact code.
fe7bd4e [Reynold Xin] Bug fixes.
fd71bef [Reynold Xin] Merge remote-tracking branch 'josh/large-records-in-sql-sorter' into kvsorter-with-josh-fix
3efae38 [Reynold Xin] More fixes and documentation.
45f1b09 [Josh Rosen] Ensure that spill files are cleaned up
f6a9bd3 [Reynold Xin] Josh feedback.
9be8139 [Reynold Xin] Remove testSpillFrequency.
7cbe759 [Reynold Xin] [SPARK-9531][SQL] UnsafeFixedWidthAggregationMap.destructAndCreateExternalSorter.
ae4a8af [Josh Rosen] Detect leaked unsafe memory in UnsafeExternalSorterSuite.
52f9b06 [Josh Rosen] Detect ShuffleMemoryManager leaks in UnsafeExternalSorter.
This pull request adds a sortedIterator method to UnsafeFixedWidthAggregationMap that sorts its data in-place by the grouping key.
This is needed so we can fallback to external sorting for aggregation.
Author: Reynold Xin <rxin@databricks.com>
Closes#7849 from rxin/bytes2bytes-sorting and squashes the following commits:
75018c6 [Reynold Xin] Updated documentation.
81a8694 [Reynold Xin] [SPARK-9520][SQL] Support in-place sort in UnsafeFixedWidthAggregationMap.
Dynamic allocation is a feature that allows a Spark application to scale the number of executors up and down dynamically based on the workload. Support was first introduced in YARN since 1.2, and then extended to Mesos coarse-grained mode recently. Today, it is finally supported in standalone mode as well!
I tested this locally and it works as expected. This is WIP because unit tests are coming.
Author: Andrew Or <andrew@databricks.com>
Closes#7532 from andrewor14/standalone-da and squashes the following commits:
b3c1736 [Andrew Or] Merge branch 'master' of github.com:apache/spark into standalone-da
879e928 [Andrew Or] Add end-to-end tests for standalone dynamic allocation
accc8f6 [Andrew Or] Address comments
ee686a8 [Andrew Or] Merge branch 'master' of github.com:apache/spark into standalone-da
c0a2c02 [Andrew Or] Fix build after merge conflict
24149eb [Andrew Or] Merge branch 'master' of github.com:apache/spark into standalone-da
2e762d6 [Andrew Or] Merge branch 'master' of github.com:apache/spark into standalone-da
6832bd7 [Andrew Or] Add tests for scheduling with executor limit
a82e907 [Andrew Or] Fix comments
0a8be79 [Andrew Or] Simplify logic by removing the worker blacklist
b7742af [Andrew Or] Merge branch 'master' of github.com:apache/spark into standalone-da
2eb5f3f [Andrew Or] Merge branch 'master' of github.com:apache/spark into standalone-da
1334e9a [Andrew Or] Fix MiMa
32abe44 [Andrew Or] Fix style
58cb06f [Andrew Or] Privatize worker blacklist for cleanliness
42ac215 [Andrew Or] Clean up comments and rewrite code for readability
49702d1 [Andrew Or] Clean up shuffle files after application exits
80047aa [Andrew Or] First working implementation
BytesToBytesMap current encodes key/value data in the following format:
```
8B key length, key data, 8B value length, value data
```
UnsafeExternalSorter, on the other hand, encodes data this way:
```
4B record length, data
```
As a result, we cannot pass records encoded by BytesToBytesMap directly into UnsafeExternalSorter for sorting. However, if we rearrange data slightly, we can then pass the key/value records directly into UnsafeExternalSorter:
```
4B key+value length, 4B key length, key data, value data
```
Author: Reynold Xin <rxin@databricks.com>
Closes#7845 from rxin/kvsort-rebase and squashes the following commits:
5716b59 [Reynold Xin] Fixed test.
2e62ccb [Reynold Xin] Updated BytesToBytesMap's data encoding to put the key first.
a51b641 [Reynold Xin] Added a KV sorter interface.