This change seems large, but most of it is just replacing `byte[]`
with `ByteBuffer` and `new byte[]` with `ByteBuffer.allocate()`,
since it changes the network library's API.
The following are parts of the code that actually have meaningful
changes:
- The Message implementations were changed to inherit from a new
AbstractMessage that can optionally hold a reference to a body
(in the form of a ManagedBuffer); this is similar to how
ResponseWithBody worked before, except now it's not restricted
to just responses.
- The TransportFrameDecoder was pretty much rewritten to avoid
copies as much as possible; it doesn't rely on CompositeByteBuf
to accumulate incoming data anymore, since CompositeByteBuf
has issues when slices are retained. The code now is able to
create frames without having to resort to copying bytes except
for a few bytes (containing the frame length) in very rare cases.
- Some minor changes in the SASL layer to convert things back to
`byte[]` since the JDK SASL API operates on those.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#9987 from vanzin/SPARK-12007.
In the previous codes, `newDaemonCachedThreadPool` uses `SynchronousQueue`, which is wrong. `SynchronousQueue` is an empty queue that cannot cache any task. This patch uses `LinkedBlockingQueue` to fix it along with other fixes to make sure `newDaemonCachedThreadPool` can use at most `maxThreadNumber` threads, and after that, cache tasks to `LinkedBlockingQueue`.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#9978 from zsxwing/cached-threadpool.
This change does a couple of different things to make sure that the RpcEnv-level
code and the network library agree about the status of outstanding RPCs.
For RPCs that do not expect a reply ("RpcEnv.send"), support for one way
messages (hello CORBA!) was added to the network layer. This is a
"fire and forget" message that does not require any state to be kept
by the TransportClient; as a result, the RpcEnv 'Ack' message is not needed
anymore.
For RPCs that do expect a reply ("RpcEnv.ask"), the network library now
returns the internal RPC id; if the RpcEnv layer decides to time out the
RPC before the network layer does, it now asks the TransportClient to
forget about the RPC, so that if the network-level timeout occurs, the
client is not killed.
As part of implementing the above, I cleaned up some of the code in the
netty rpc backend, removing types that were not necessary and factoring
out some common code. Of interest is a slight change in the exceptions
when posting messages to a stopped RpcEnv; that's mostly to avoid nasty
error messages from the local-cluster backend when shutting down, which
pollutes the terminal output.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#9917 from vanzin/SPARK-11866.
- NettyRpcEnv::openStream() now correctly propagates errors to
the read side of the pipe.
- NettyStreamManager now throws if the file being transferred does
not exist.
- The network library now correctly handles zero-sized streams.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#9941 from vanzin/SPARK-11956.
This issue was addressed in https://github.com/apache/spark/pull/5494, but the fix in that PR, while safe in the sense that it will prevent the SparkContext from shutting down, misses the actual bug. The intent of `submitMissingTasks` should be understood as "submit the Tasks that are missing for the Stage, and run them as part of the ActiveJob identified by jobId". Because of a long-standing bug, the `jobId` parameter was never being used. Instead, we were trying to use the jobId with which the Stage was created -- which may no longer exist as an ActiveJob, hence the crash reported in SPARK-6880.
The correct fix is to use the ActiveJob specified by the supplied jobId parameter, which is guaranteed to exist at the call sites of submitMissingTasks.
This fix should be applied to all maintenance branches, since it has existed since 1.0.
kayousterhout pankajarora12
Author: Mark Hamstra <markhamstra@gmail.com>
Author: Imran Rashid <irashid@cloudera.com>
Closes#6291 from markhamstra/SPARK-6880.
This is continuation of SPARK-11761
Andrew suggested adding this protection. See tail of https://github.com/apache/spark/pull/9741
Author: tedyu <yuzhihong@gmail.com>
Closes#9852 from tedyu/master.
This change abstracts the code that serves jars / files to executors so that
each RpcEnv can have its own implementation; the akka version uses the existing
HTTP-based file serving mechanism, while the netty versions uses the new
stream support added to the network lib, which makes file transfers benefit
from the easier security configuration of the network library, and should also
reduce overhead overall.
The change includes a small fix to TransportChannelHandler so that it propagates
user events to downstream handlers.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#9530 from vanzin/SPARK-11140.
In PersistenceEngineSuite, we do not call `close()` on the PersistenceEngine at the end of the test. For the ZooKeeperPersistenceEngine, this causes us to leak a ZooKeeper client, causing the logs of unrelated tests to be periodically spammed with connection error messages from that client:
```
15/11/20 05:13:35.789 pool-1-thread-1-ScalaTest-running-PersistenceEngineSuite-SendThread(localhost:15741) INFO ClientCnxn: Opening socket connection to server localhost/127.0.0.1:15741. Will not attempt to authenticate using SASL (unknown error)
15/11/20 05:13:35.790 pool-1-thread-1-ScalaTest-running-PersistenceEngineSuite-SendThread(localhost:15741) WARN ClientCnxn: Session 0x15124ff48dd0000 for server null, unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:350)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1068)
```
This patch fixes this by using a `finally` block.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#9864 from JoshRosen/close-zookeeper-client-in-tests.
This patch reduces some RPC timeouts in order to speed up the slow "AkkaUtilsSuite.remote fetch ssl on - untrusted server", which used to take two minutes to run.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#9869 from JoshRosen/SPARK-11650.
To make sure that all lineage is correctly truncated for TrackStateRDD when checkpointed.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#9831 from tdas/SPARK-11845.
SparkListenerSuite's _"onTaskGettingResult() called when result fetched remotely"_ test was extremely slow (1 to 4 minutes to run) and recently became extremely flaky, frequently failing with OutOfMemoryError.
The root cause was the fact that this was using `System.setProperty` to set the Akka frame size, which was not actually modifying the frame size. As a result, this test would allocate much more data than necessary. The fix here is to simply use SparkConf in order to configure the frame size.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#9822 from JoshRosen/SPARK-11649.
[SPARK-6028](https://issues.apache.org/jira/browse/SPARK-6028) uses network module to implement RPC. However, there are some configurations named with `spark.shuffle` prefix in the network module.
This PR refactors them to make sure the user can control them in shuffle and RPC separately. The user can use `spark.rpc.*` to set the configuration for netty RPC.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#9481 from zsxwing/SPARK-10745.
https://issues.apache.org/jira/browse/SPARK-11792
The main changes include:
* Renaming `SizeEstimation` to `KnownSizeEstimation`. Hopefully this new name has more information.
* Making `estimatedSize` return `Long` instead of `Option[Long]`.
* In `UnsaveHashedRelation`, `estimatedSize` will delegate the work to `SizeEstimator` if we have not created a `BytesToBytesMap`.
Since we will put `UnsaveHashedRelation` to `BlockManager`, it is generally good to let it provide a more accurate size estimation. Also, if we do not put `BytesToBytesMap` directly into `BlockerManager`, I feel it is not really necessary to make `BytesToBytesMap` extends `KnownSizeEstimation`.
Author: Yin Huai <yhuai@databricks.com>
Closes#9813 from yhuai/SPARK-11792-followup.
Make sure we are using the context classloader when deserializing failed TaskResults instead of the Spark classloader.
The issue is that `enqueueFailedTask` was using the incorrect classloader which results in `ClassNotFoundException`.
Adds a test in TaskResultGetterSuite that compiles a custom exception, throws it on the executor, and asserts that Spark handles the TaskResult deserialization instead of returning `UnknownReason`.
See #9367 for previous comments
See SPARK-11195 for a full repro
Author: Hurshal Patel <hpatel516@gmail.com>
Closes#9779 from choochootrain/spark-11195-master.
This PR upgrade the version of RoaringBitmap to 0.5.10, to optimize the memory layout, will be much smaller when most of blocks are empty.
This PR is based on #9661 (fix conflicts), see all of the comments at https://github.com/apache/spark/pull/9661 .
Author: Kent Yao <yaooqinn@hotmail.com>
Author: Davies Liu <davies@databricks.com>
Author: Charles Allen <charles@allen-net.com>
Closes#9746 from davies/roaring_mapstatus.
By using the dynamic allocation, sometimes it occurs false killing for those busy executors. Some executors with assignments will be killed because of being idle for enough time (say 60 seconds). The root cause is that the Task-Launch listener event is asynchronized.
For example, some executors are under assigning tasks, but not sending out the listener notification yet. Meanwhile, the dynamic allocation's executor idle time is up (e.g., 60 seconds). It will trigger killExecutor event at the same time.
1. the timer expiration starts before the listener event arrives.
2. Then, the task is going to run on top of that killed/killing executor. It will lead to task failure finally.
Here is the proposal to fix it. We can add the force control for killExecutor. If the force control is not set (i.e., false), we'd better to check if the executor under killing is idle or busy. If the current executor has some assignment, we should not kill that executor and return back false (to indicate killing failure). In dynamic allocation, we'd better to turn off force killing (i.e., force = false), we will meet killing failure if tries to kill a busy executor. And then, the executor timer won't be invalid. Later on, the task assignment event arrives, we can remove the idle timer accordingly. So that we can avoid false killing for those busy executors in dynamic allocation.
For the rest of usages, the end users can decide if to use force killing or not by themselves. If to turn on that option, the killExecutor will do the action without any status checking.
Author: Grace <jie.huang@intel.com>
Author: Andrew Or <andrew@databricks.com>
Author: Jie Huang <jie.huang@intel.com>
Closes#7888 from GraceH/forcekill.
The basic idea is that:
The archive of the SparkR package itself, that is sparkr.zip, is created during build process and is contained in the Spark binary distribution. No change to it after the distribution is installed as the directory it resides ($SPARK_HOME/R/lib) may not be writable.
When there is R source code contained in jars or Spark packages specified with "--jars" or "--packages" command line option, a temporary directory is created by calling Utils.createTempDir() where the R packages built from the R source code will be installed. The temporary directory is writable, and won't interfere with each other when there are multiple SparkR sessions, and will be deleted when this SparkR session ends. The R binary packages installed in the temporary directory then are packed into an archive named rpkg.zip.
sparkr.zip and rpkg.zip are distributed to the cluster in YARN modes.
The distribution of rpkg.zip in Standalone modes is not supported in this PR, and will be address in another PR.
Various R files are updated to accept multiple lib paths (one is for SparkR package, the other is for other R packages) so that these package can be accessed in R.
Author: Sun Rui <rui.sun@intel.com>
Closes#9390 from sun-rui/SPARK-10500.
Currently, all the shuffle writer will write to target path directly, the file could be corrupted by other attempt of the same partition on the same executor. They should write to temporary file then rename to target path, as what we do in output committer. In order to make the rename atomic, the temporary file should be created in the same local directory (FileSystem).
This PR is based on #9214 , thanks to squito . Closes#9214
Author: Davies Liu <davies@databricks.com>
Closes#9610 from davies/safe_shuffle.
This patch aims to reduce the test time and flakiness of HiveSparkSubmitSuite, SparkSubmitSuite, and CliSuite.
Key changes:
- Disable IO synchronization calls for Derby writes, since durability doesn't matter for tests. This was done for HiveCompatibilitySuite in #6651 and resulted in huge test speedups.
- Add a few missing `--conf`s to disable various Spark UIs. The CliSuite, in particular, never disabled these UIs, leaving it prone to port-contention-related flakiness.
- Fix two instances where tests defined `beforeAll()` methods which were never called because the appropriate traits were not mixed in. I updated these tests suites to extend `BeforeAndAfterEach` so that they play nicely with our `ResetSystemProperties` trait.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#9623 from JoshRosen/SPARK-11647.
just trying to increase test coverage in the scheduler, this already works. It includes a regression test for SPARK-9809
copied some test utils from https://github.com/apache/spark/pull/5636, we can wait till that is merged first
Author: Imran Rashid <irashid@cloudera.com>
Closes#8402 from squito/test_retry_in_shared_shuffle_dep.
Changed AppClient to be non-blocking in `receiveAndReply` by using a separate thread to wait for response and reply to the context. The threads are managed by a thread pool. Also added unit tests for the AppClient interface.
Author: Bryan Cutler <bjcutler@us.ibm.com>
Closes#9317 from BryanCutler/appClient-receiveAndReply-SPARK-10827.
In order to lay the groundwork for proper off-heap memory support in SQL / Tungsten, we need to extend our MemoryManager to perform bookkeeping for off-heap memory.
## User-facing changes
This PR introduces a new configuration, `spark.memory.offHeapSize` (name subject to change), which specifies the absolute amount of off-heap memory that Spark and Spark SQL can use. If Tungsten is configured to use off-heap execution memory for allocating data pages, then all data page allocations must fit within this size limit.
## Internals changes
This PR contains a lot of internal refactoring of the MemoryManager. The key change at the heart of this patch is the introduction of a `MemoryPool` class (name subject to change) to manage the bookkeeping for a particular category of memory (storage, on-heap execution, and off-heap execution). These MemoryPools are not fixed-size; they can be dynamically grown and shrunk according to the MemoryManager's policies. In StaticMemoryManager, these pools have fixed sizes, proportional to the legacy `[storage|shuffle].memoryFraction`. In the new UnifiedMemoryManager, the sizes of these pools are dynamically adjusted according to its policies.
There are two subclasses of `MemoryPool`: `StorageMemoryPool` manages storage memory and `ExecutionMemoryPool` manages execution memory. The MemoryManager creates two execution pools, one for on-heap memory and one for off-heap. Instances of `ExecutionMemoryPool` manage the logic for fair sharing of their pooled memory across running tasks (in other words, the ShuffleMemoryManager-like logic has been moved out of MemoryManager and pushed into these ExecutionMemoryPool instances).
I think that this design is substantially easier to understand and reason about than the previous design, where most of these responsibilities were handled by MemoryManager and its subclasses. To see this, take at look at how simple the logic in `UnifiedMemoryManager` has become: it's now very easy to see when memory is dynamically shifted between storage and execution.
## TODOs
- [x] Fix handful of test failures in the MemoryManagerSuites.
- [x] Fix remaining TODO comments in code.
- [ ] Document new configuration.
- [x] Fix commented-out tests / asserts:
- [x] UnifiedMemoryManagerSuite.
- [x] Write tests that exercise the new off-heap memory management policies.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#9344 from JoshRosen/offheap-memory-accounting.
https://issues.apache.org/jira/browse/SPARK-10116
This is really trivial, just happened to notice it -- if `XORShiftRandom.hashSeed` is really supposed to have random bits throughout (as the comment implies), it needs to do something for the conversion to `long`.
mengxr mkolod
Author: Imran Rashid <irashid@cloudera.com>
Closes#8314 from squito/SPARK-10116.
This brings the support of off-heap memory for array inside BytesToBytesMap and InMemorySorter, then we could allocate all the memory from off-heap for execution.
Closes#8068
Author: Davies Liu <davies@databricks.com>
Closes#9477 from davies/unsafe_timsort.
OutputCommitCoordinator uses a map in a place where an array would suffice, increasing its memory consumption for result stages with millions of tasks.
This patch replaces that map with an array. The only tricky part of this is reasoning about the range of possible array indexes in order to make sure that we never index out of bounds.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#9274 from JoshRosen/SPARK-11307.
This is an updated version of #8995 by a-roberts. Original description follows:
Snappy now supports concatenation of serialized streams, this patch contains a version number change and the "does not support" test is now a "supports" test.
Snappy 1.1.2 changelog mentions:
> snappy-java-1.1.2 (22 September 2015)
> This is a backward compatible release for 1.1.x.
> Add AIX (32-bit) support.
> There is no upgrade for the native libraries of the other platforms.
> A major change since 1.1.1 is a support for reading concatenated results of SnappyOutputStream(s)
> snappy-java-1.1.2-RC2 (18 May 2015)
> Fix#107: SnappyOutputStream.close() is not idempotent
> snappy-java-1.1.2-RC1 (13 May 2015)
> SnappyInputStream now supports reading concatenated compressed results of SnappyOutputStream
> There has been no compressed format change since 1.0.5.x. So You can read the compressed results > interchangeablly between these versions.
> Fixes a problem when java.io.tmpdir does not exist.
Closes#8995.
Author: Adam Roberts <aroberts@uk.ibm.com>
Author: Josh Rosen <joshrosen@databricks.com>
Closes#9439 from JoshRosen/update-snappy.
In YARN mode, when preemption is enabled, we may leave executors in a
zombie state while we wait to retrieve the reason for which the executor
exited. This is so that we don't account for failed tasks that were
running on a preempted executor.
The issue is that while we wait for this information, the scheduler
might decide to schedule tasks on the executor, which will never be
able to run them. Other side effects include the block manager still
considering the executor available to cache blocks, for example.
So, when we know that an executor went down but we don't know why,
stop everything related to the executor, except its running tasks.
Only when we know the reason for the exit (or give up waiting for
it) we do update the running tasks.
This is achieved by a new `disableExecutor()` method in the
`Schedulable` interface. For managers that do not behave like this
(i.e. every one but YARN), the existing `executorLost()` method
will behave the same way it did before.
On top of that change, a few minor changes that made debugging easier,
and fixed some other minor issues:
- The cluster-mode AM was printing a misleading log message every
time an executor disconnected from the driver (because the akka
actor system was shared between driver and AM).
- Avoid sending unnecessary requests for an executor's exit reason
when we already know it was explicitly disabled / killed. This
avoids both multiple requests, and unnecessary requests that would
just cause warning messages on the AM (in the explicit kill case).
- Tone down a log message about the executor being lost when it
exited normally (e.g. preemption)
- Wake up the AM monitor thread when requests for executor loss
reasons arrive too, so that we can more quickly remove executors
from this zombie state.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#8887 from vanzin/SPARK-10622.
The test functionality should be the same, but without using mockito; logs don't
really say anything useful but I suspect it may be the cause of the flakiness,
since updating mocks when multiple threads may be using it doesn't work very
well. It also allows some other cleanup (= less test code in FsHistoryProvider).
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#9425 from vanzin/SPARK-11466.
DriverDescription refactored to case class because it included no mutable fields.
ApplicationDescription had one mutable field, which was appUiUrl. This field was set by the driver to point to the driver web UI. Master was modifying this field when the application was removed to redirect requests to history server. This was wrong because objects which are sent over the wire should be immutable. Now appUiUrl is immutable in ApplicationDescription and always points to the driver UI even if it is already shutdown. The UI url which master exposes to the user and modifies dynamically is now included into ApplicationInfo - a data object which describes the application state internally in master. That URL in ApplicationInfo is initialised with the value from ApplicationDescription.
ApplicationDescription also included value user, which is now a part of case class fields.
Author: Jacek Lewandowski <lewandowski.jacek@gmail.com>
Closes#9299 from jacek-lewandowski/SPARK-11344.
"Client mode" means the RPC env will not listen for incoming connections.
This allows certain processes in the Spark stack (such as Executors or
tha YARN client-mode AM) to act as pure clients when using the netty-based
RPC backend, reducing the number of sockets needed by the app and also the
number of open ports.
Client connections are also preferred when endpoints that actually have
a listening socket are involved; so, for example, if a Worker connects
to a Master and the Master needs to send a message to a Worker endpoint,
that client connection will be used, even though the Worker is also
listening for incoming connections.
With this change, the workaround for SPARK-10987 isn't necessary anymore, and
is removed. The AM connects to the driver in "client mode", and that connection
is used for all driver <-> AM communication, and so the AM is properly notified
when the connection goes down.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#9210 from vanzin/SPARK-10997.
JIRA: https://issues.apache.org/jira/browse/SPARK-11271
As reported in the JIRA ticket, when there are too many tasks, the memory usage of MapStatus will cause problem. Use BitSet instead of RoaringBitMap should be more efficient in memory usage.
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#9243 from viirya/mapstatus-bitset.
Use standard JDK APIs for that (with a little help from Guava). Most of the
changes here are in test code, since there were no tests specific to that
part of the code.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#9257 from vanzin/SPARK-11073.
Large HDFS clusters may take a while to leave safe mode when starting; this change
makes the HS wait for that before doing checks about its configuraton. This means
the HS won't stop right away if HDFS is in safe mode and the configuration is not
correct, but that should be a very uncommon situation.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#9043 from vanzin/SPARK-11020.
[SPARK-11338: HistoryPage not multi-tenancy enabled ...](https://issues.apache.org/jira/browse/SPARK-11338)
- `HistoryPage.scala` ...prepending all page links with the web proxy (`uiRoot`) path
- `HistoryServerSuite.scala` ...adding a test case to verify all site-relative links are prefixed when the environment variable `APPLICATION_WEB_PROXY_BASE` (or System property `spark.ui.proxyBase`) is set
Author: Christian Kadner <ckadner@us.ibm.com>
Closes#9291 from ckadner/SPARK-11338 and squashes the following commits:
01d2f35 [Christian Kadner] [SPARK-11338][WebUI] nit fixes
d054bd7 [Christian Kadner] [SPARK-11338][WebUI] prependBaseUri in method makePageLink
8bcb3dc [Christian Kadner] [SPARK-11338][WebUI] Prepend application links on HistoryPage with uiRoot path
Since we do not need to preserve a page before calling compute(), MapPartitionsWithPreparationRDD is not needed anymore.
This PR basically revert #8543, #8511, #8038, #8011
Author: Davies Liu <davies@databricks.com>
Closes#9381 from davies/remove_prepare2.
This PR introduce a mechanism to call spill() on those SQL operators that support spilling (for example, BytesToBytesMap, UnsafeExternalSorter and ShuffleExternalSorter) if there is not enough memory for execution. The preserved first page is needed anymore, so removed.
Other Spillable objects in Spark core (ExternalSorter and AppendOnlyMap) are not included in this PR, but those could benefit from this (trigger others' spilling).
The PrepareRDD may be not needed anymore, could be removed in follow up PR.
The following script will fail with OOM before this PR, finished in 150 seconds with 2G heap (also works in 1.5 branch, with similar duration).
```python
sqlContext.setConf("spark.sql.shuffle.partitions", "1")
df = sqlContext.range(1<<25).selectExpr("id", "repeat(id, 2) as s")
df2 = df.select(df.id.alias('id2'), df.s.alias('s2'))
j = df.join(df2, df.id==df2.id2).groupBy(df.id).max("id", "id2")
j.explain()
print j.count()
```
For thread-safety, here what I'm got:
1) Without calling spill(), the operators should only be used by single thread, no safety problems.
2) spill() could be triggered in two cases, triggered by itself, or by other operators. we can check trigger == this in spill(), so it's still in the same thread, so safety problems.
3) if it's triggered by other operators (right now cache will not trigger spill()), we only spill the data into disk when it's in scanning stage (building is finished), so the in-memory sorter or memory pages are read-only, we only need to synchronize the iterator and change it.
4) During scanning, the iterator will only use one record in one page, we can't free this page, because the downstream is currently using it (used by UnsafeRow or other objects). In BytesToBytesMap, we just skip the current page, and dump all others into disk. In UnsafeExternalSorter, we keep the page that is used by current record (having the same baseObject), free it when loading the next record. In ShuffleExternalSorter, the spill() will not trigger during scanning.
5) In order to avoid deadlock, we didn't call acquireMemory during spill (so we reused the pointer array in InMemorySorter).
Author: Davies Liu <davies@databricks.com>
Closes#9241 from davies/force_spill.
Commit af3bc59d1f introduced new
functionality so that if an executor dies for a reason that's not
caused by one of the tasks running on the executor (e.g., due to
pre-emption), Spark doesn't count the failure towards the maximum
number of failures for the task. That commit introduced some vague
naming that this commit attempts to fix; in particular:
(1) The variable "isNormalExit", which was used to refer to cases where
the executor died for a reason unrelated to the tasks running on the
machine, has been renamed (and reversed) to "exitCausedByApp". The problem
with the existing name is that it's not clear (at least to me!) what it
means for an exit to be "normal"; the new name is intended to make the
purpose of this variable more clear.
(2) The variable "shouldEventuallyFailJob" has been renamed to
"countTowardsTaskFailures". This variable is used to determine whether
a task's failure should be counted towards the maximum number of failures
allowed for a task before the associated Stage is aborted. The problem
with the existing name is that it can be confused with implying that
the task's failure should immediately cause the stage to fail because it
is somehow fatal (this is the case for a fetch failure, for example: if
a task fails because of a fetch failure, there's no point in retrying,
and the whole stage should be failed).
Author: Kay Ousterhout <kayousterhout@gmail.com>
Closes#9164 from kayousterhout/SPARK-11178.
… ReceiverTracker and ReceiverSchedulingPolicy to use it
This PR includes the following changes:
1. Add a new preferred location format, `executor_<host>_<executorID>` (e.g., "executor_localhost_2"), to support specifying the executor locations for RDD.
2. Use the new preferred location format in `ReceiverTracker` to optimize the starting time of Receivers when there are multiple executors in a host.
The goal of this PR is to enable the streaming scheduler to place receivers (which run as tasks) in specific executors. Basically, I want to have more control on the placement of the receivers such that they are evenly distributed among the executors. We tried to do this without changing the core scheduling logic. But it does not allow specifying particular executor as preferred location, only at the host level. So if there are two executors in the same host, and I want two receivers to run on them (one on each executor), I cannot specify that. Current code only specifies the host as preference, which may end up launching both receivers on the same executor. We try to work around it but restarting a receiver when it does not launch in the desired executor and hope that next time it will be started in the right one. But that cause lots of restarts, and delays in correctly launching the receiver.
So this change, would allow the streaming scheduler to specify the exact executor as the preferred location. Also this is not exposed to the user, only the streaming scheduler uses this.
Author: zsxwing <zsxwing@gmail.com>
Closes#9181 from zsxwing/executor-location.
This patch refactors the MemoryManager class structure. After #9000, Spark had the following classes:
- MemoryManager
- StaticMemoryManager
- ExecutorMemoryManager
- TaskMemoryManager
- ShuffleMemoryManager
This is fairly confusing. To simplify things, this patch consolidates several of these classes:
- ShuffleMemoryManager and ExecutorMemoryManager were merged into MemoryManager.
- TaskMemoryManager is moved into Spark Core.
**Key changes and tasks**:
- [x] Merge ExecutorMemoryManager into MemoryManager.
- [x] Move pooling logic into Allocator.
- [x] Move TaskMemoryManager from `spark-unsafe` to `spark-core`.
- [x] Refactor the existing Tungsten TaskMemoryManager interactions so Tungsten code use only this and not both this and ShuffleMemoryManager.
- [x] Refactor non-Tungsten code to use the TaskMemoryManager instead of ShuffleMemoryManager.
- [x] Merge ShuffleMemoryManager into MemoryManager.
- [x] Move code
- [x] ~~Simplify 1/n calculation.~~ **Will defer to followup, since this needs more work.**
- [x] Port ShuffleMemoryManagerSuite tests.
- [x] Move classes from `unsafe` package to `memory` package.
- [ ] Figure out how to handle the hacky use of the memory managers in HashedRelation's broadcast variable construction.
- [x] Test porting and cleanup: several tests relied on mock functionality (such as `TestShuffleMemoryManager.markAsOutOfMemory`) which has been changed or broken during the memory manager consolidation
- [x] AbstractBytesToBytesMapSuite
- [x] UnsafeExternalSorterSuite
- [x] UnsafeFixedWidthAggregationMapSuite
- [x] UnsafeKVExternalSorterSuite
**Compatiblity notes**:
- This patch introduces breaking changes in `ExternalAppendOnlyMap`, which is marked as `DevloperAPI` (likely for legacy reasons): this class now cannot be used outside of a task.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#9127 from JoshRosen/SPARK-10984.
This test can take a little while to finish on slow / loaded machines.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#9235 from vanzin/SPARK-11134.
```
// My machine only has 8 cores
$ bin/spark-shell --master local[32]
scala> val df = sc.parallelize(Seq((1, 1), (2, 2))).toDF("a", "b")
scala> df.as("x").join(df.as("y"), $"x.a" === $"y.a").count()
Caused by: java.io.IOException: Unable to acquire 2097152 bytes of memory
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:351)
```
Author: Andrew Or <andrew@databricks.com>
Closes#9209 from andrewor14/fix-local-page-size.
There's a lot of duplication between SortShuffleManager and UnsafeShuffleManager. Given that these now provide the same set of functionality, now that UnsafeShuffleManager supports large records, I think that we should replace SortShuffleManager's serialized shuffle implementation with UnsafeShuffleManager's and should merge the two managers together.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#8829 from JoshRosen/consolidate-sort-shuffle-implementations.
Correct the logic to return `HDFSCacheTaskLocation` instance when the input `str` is a in memory location.
Author: zhichao.li <zhichao.li@intel.com>
Closes#9096 from zhichao-li/uselessBranch.
JIRA: https://issues.apache.org/jira/browse/SPARK-11051
When a `RDD` is materialized and checkpointed, its partitions and dependencies are cleared. If we allow local checkpointing on it and assign `LocalRDDCheckpointData` to its `checkpointData`. Next time when the RDD is materialized again, the error will be thrown.
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#9072 from viirya/no-localcheckpoint-after-checkpoint.
Because the registration RPC was not really an RPC, but a bunch of
disconnected messages, it was possible for other messages to be
sent before the reply to the registration arrived, and that would
confuse the Worker. Especially in local-cluster mode, the worker was
succeptible to receiving an executor request before it received a
message from the master saying registration succeeded.
On top of the above, the change also fixes a ClassCastException when
the registration fails, which also affects the executor registration
protocol. Because the `ask` is issued with a specific return type,
if the error message (of a different type) was returned instead, the
code would just die with an exception. This is fixed by having a common
base trait for these reply messages.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#9138 from vanzin/SPARK-11131.
#9084 uncovered that many tests that test spilling don't actually spill. This is a follow-up patch to fix that to ensure our unit tests actually catch potential bugs in spilling. The size of this patch is inflated by the refactoring of `ExternalSorterSuite`, which had a lot of duplicate code and logic.
Author: Andrew Or <andrew@databricks.com>
Closes#9124 from andrewor14/spilling-tests.
If the heartbeat receiver kills executors (and new ones are not registered to replace them), the idle timeout for the old executors will be lost (and then change a total number of executors requested by Driver), So new ones will be not to asked to replace them.
For example, executorsPendingToRemove=Set(1), and executor 2 is idle timeout before a new executor is asked to replace executor 1. Then driver kill executor 2, and sending RequestExecutors to AM. But executorsPendingToRemove=Set(1,2), So AM doesn't allocate a executor to replace 1.
see: https://github.com/apache/spark/pull/8668
Author: KaiXinXiaoLei <huleilei1@huawei.com>
Author: huleilei <huleilei1@huawei.com>
Closes#8945 from KaiXinXiaoLei/pendingexecutor.
Internal accumulators don't write the internal flag to event log. So on the history server Web UI, all accumulators are not internal. This causes incorrect peak execution memory and unwanted accumulator table displayed on the stage page.
To fix it, I add the "internal" property of AccumulableInfo when writing the event log.
Author: Carson Wang <carson.wang@intel.com>
Closes#9061 from carsonwang/accumulableBug.
Restrict tasks (of job) to only 1 to ensure that the causing Exception asserted for job failure is the deliberately thrown DAGSchedulerSuiteDummyException intended, not an UnsupportedOperationException from any second/subsequent tasks that can propagate from a race condition during code execution.
Author: shellberg <sah@zepler.org>
Closes#9076 from shellberg/shellberg-DAGSchedulerSuite-misbehavedResultHandlerTest-patch-1.
A few more changes:
1. Renamed IDVerifier -> RpcEndpointVerifier
2. Renamed NettyRpcAddress -> RpcEndpointAddress
3. Simplified NettyRpcHandler a bit by removing the connection count tracking. This is OK because I now force spark.shuffle.io.numConnectionsPerPeer to 1
4. Reduced spark.rpc.connect.threads to 64. It would be great to eventually remove this extra thread pool.
5. Minor cleanup & documentation.
Author: Reynold Xin <rxin@databricks.com>
Closes#9112 from rxin/SPARK-11096.
This patch unifies the memory management of the storage and execution regions such that either side can borrow memory from each other. When memory pressure arises, storage will be evicted in favor of execution. To avoid regressions in cases where storage is crucial, we dynamically allocate a fraction of space for storage that execution cannot evict. Several configurations are introduced:
- **spark.memory.fraction (default 0.75)**: fraction of the heap space used for execution and storage. The lower this is, the more frequently spills and cached data eviction occur. The purpose of this config is to set aside memory for internal metadata, user data structures, and imprecise size estimation in the case of sparse, unusually large records.
- **spark.memory.storageFraction (default 0.5)**: size of the storage region within the space set aside by `spark.memory.fraction`. Cached data may only be evicted if total storage exceeds this region.
- **spark.memory.useLegacyMode (default false)**: whether to use the memory management that existed in Spark 1.5 and before. This is mainly for backward compatibility.
For a detailed description of the design, see [SPARK-10000](https://issues.apache.org/jira/browse/SPARK-10000). This patch builds on top of the `MemoryManager` interface introduced in #9000.
Author: Andrew Or <andrew@databricks.com>
Closes#9084 from andrewor14/unified-memory-manager.
I'm going through the implementation right now for post-doc review. Adding more comments and renaming things as I go through them.
I also want to write higher level documentation about how the whole thing works -- but those will come in other pull requests.
Author: Reynold Xin <rxin@databricks.com>
Closes#9091 from rxin/rpc-review.
https://issues.apache.org/jira/browse/SPARK-10858
The issue here is that in resolveURI we default to calling new File(path).getAbsoluteFile().toURI(). But if the path passed in already has a # in it then File(path) will think that is supposed to be part of the actual file path and not a fragment so it changes # to %23. Then when we try to parse that later in Client as a URI it doesn't recognize there is a fragment.
so to fix we just check if there is a fragment, still create the File like we did before and then add the fragment back on.
Author: Tom Graves <tgraves@yahoo-inc.com>
Closes#9035 from tgravescs/SPARK-10858.
This change adds an API that encapsulates information about an app
launched using the library. It also creates a socket-based communication
layer for apps that are launched as child processes; the launching
application listens for connections from launched apps, and once
communication is established, the channel can be used to send updates
to the launching app, or to send commands to the child app.
The change also includes hooks for local, standalone/client and yarn
masters.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#7052 from vanzin/SPARK-8673.
This patch introduces a `MemoryManager` that is the central arbiter of how much memory to grant to storage and execution. This patch is primarily concerned only with refactoring while preserving the existing behavior as much as possible.
This is the first step away from the existing rigid separation of storage and execution memory, which has several major drawbacks discussed on the [issue](https://issues.apache.org/jira/browse/SPARK-10956). It is the precursor of a series of patches that will attempt to address those drawbacks.
Author: Andrew Or <andrew@databricks.com>
Author: Josh Rosen <joshrosen@databricks.com>
Author: andrewor14 <andrew@databricks.com>
Closes#9000 from andrewor14/memory-manager.
This PR just reverted 02144d6745 to remerge #6457 and also included the commits in #8905.
Author: zsxwing <zsxwing@gmail.com>
Closes#8944 from zsxwing/SPARK-6028.
Compatibility between history server script and functionality
The history server has its argument parsing class in HistoryServerArguments. However, this doesn't get involved in the start-history-server.sh codepath where the $0 arg is assigned to spark.history.fs.logDirectory and all other arguments discarded (e.g --property-file.)
This stops the other options being usable from this script
Author: Joshi <rekhajoshm@gmail.com>
Author: Rekha Joshi <rekhajoshm@gmail.com>
Closes#8758 from rekhajoshm/SPARK-10317.
Fix the following issues in StandaloneDynamicAllocationSuite:
1. It should not assume master and workers start in order
2. It should not assume master and workers get ready at once
3. It should not assume the application is already registered with master after creating SparkContext
4. It should not access Master.app and idToApp which are not thread safe
The changes includes:
* Use `eventually` to wait until master and workers are ready to fix 1 and 2
* Use `eventually` to wait until the application is registered with master to fix 3
* Use `askWithRetry[MasterStateResponse](RequestMasterState)` to get the application info to fix 4
Author: zsxwing <zsxwing@gmail.com>
Closes#8914 from zsxwing/fix-StandaloneDynamicAllocationSuite.
This makes two changes:
- Allow reduce tasks to fetch multiple map output partitions -- this is a pretty small change to HashShuffleFetcher
- Move shuffle locality computation out of DAGScheduler and into ShuffledRDD / MapOutputTracker; this was needed because the code in DAGScheduler wouldn't work for RDDs that fetch multiple map output partitions from each reduce task
I also added an AdaptiveSchedulingSuite that creates RDDs depending on multiple map output partitions.
Author: Matei Zaharia <matei@databricks.com>
Closes#8844 from mateiz/spark-9852.
The DiskBlockObjectWriter constructor took a BlockId parameter but never used it. As part of some general cleanup in these interfaces, this patch refactors its constructor to eliminate this parameter.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#8871 from JoshRosen/disk-block-object-writer-blockid-cleanup.
The current shuffle code has an interface named ShuffleReader with only one implementation, HashShuffleReader. This naming is confusing, since the same read path code is used for both sort- and hash-based shuffle. This patch addresses this by renaming HashShuffleReader to BlockStoreShuffleReader.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#8825 from JoshRosen/shuffle-reader-cleanup.
The job group, and job descriptions information is passed through thread local properties, and get inherited by child threads. In case of spark streaming, the streaming jobs inherit these properties from the thread that called streamingContext.start(). This may not make sense.
1. Job group: This is mainly used for cancelling a group of jobs together. It does not make sense to cancel streaming jobs like this, as the effect will be unpredictable. And its not a valid usecase any way, to cancel a streaming context, call streamingContext.stop()
2. Job description: This is used to pass on nice text descriptions for jobs to show up in the UI. The job description of the thread that calls streamingContext.start() is not useful for all the streaming jobs, as it does not make sense for all of the streaming jobs to have the same description, and the description may or may not be related to streaming.
The solution in this PR is meant for the Spark master branch, where local properties are inherited by cloning the properties. The job group and job description in the thread that starts the streaming scheduler are explicitly removed, so that all the subsequent child threads does not inherit them. Also, the starting is done in a new child thread, so that setting the job group and description for streaming, does not change those properties in the thread that called streamingContext.start().
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#8781 from tdas/SPARK-10649.
Track pending tasks by partition ID instead of Task objects.
Before this change, failure & retry could result in a case where a stage got submitted before the map output from its dependencies get registered. This was due to an error in the condition for registering map outputs.
Author: hushan[胡珊] <hushan@xiaomi.com>
Author: Imran Rashid <irashid@cloudera.com>
Closes#7699 from squito/SPARK-5259.
I noticed only one block manager registered with master in an unsuccessful build (https://amplab.cs.berkeley.edu/jenkins/job/Spark-Master-SBT/AMPLAB_JENKINS_BUILD_PROFILE=hadoop2.2,label=spark-test/3534/)
```
15/09/16 13:02:30.981 pool-1-thread-1-ScalaTest-running-BroadcastSuite INFO SparkContext: Running Spark version 1.6.0-SNAPSHOT
...
15/09/16 13:02:38.133 sparkDriver-akka.actor.default-dispatcher-19 INFO BlockManagerMasterEndpoint: Registering block manager localhost:48196 with 530.3 MB RAM, BlockManagerId(0, localhost, 48196)
```
In addition, the first block manager needed 7+ seconds to start. But the test expected 2 block managers so it failed.
However, there was no exception in this log file. So I checked a successful build (https://amplab.cs.berkeley.edu/jenkins/job/Spark-Master-SBT/3536/AMPLAB_JENKINS_BUILD_PROFILE=hadoop2.2,label=spark-test/) and it needed 4-5 seconds to set up the local cluster:
```
15/09/16 18:11:27.738 sparkWorker1-akka.actor.default-dispatcher-5 INFO Worker: Running Spark version 1.6.0-SNAPSHOT
...
15/09/16 18:11:30.838 sparkDriver-akka.actor.default-dispatcher-20 INFO BlockManagerMasterEndpoint: Registering block manager localhost:54202 with 530.3 MB RAM, BlockManagerId(1, localhost, 54202)
15/09/16 18:11:32.112 sparkDriver-akka.actor.default-dispatcher-20 INFO BlockManagerMasterEndpoint: Registering block manager localhost:32955 with 530.3 MB RAM, BlockManagerId(0, localhost, 32955)
```
In this build, the first block manager needed only 3+ seconds to start.
Comparing these two builds, I guess it's possible that the local cluster in `BroadcastSuite` cannot be ready in 10 seconds if the Jenkins worker is busy. So I just increased the timeout to 60 seconds to see if this can fix the issue.
Author: zsxwing <zsxwing@gmail.com>
Closes#8813 from zsxwing/fix-BroadcastSuite.
It does not make much sense to set `spark.shuffle.spill` or `spark.sql.planner.externalSort` to false: I believe that these configurations were initially added as "escape hatches" to guard against bugs in the external operators, but these operators are now mature and well-tested. In addition, these configurations are not handled in a consistent way anymore: SQL's Tungsten codepath ignores these configurations and will continue to use spilling operators. Similarly, Spark Core's `tungsten-sort` shuffle manager does not respect `spark.shuffle.spill=false`.
This pull request removes these configurations, adds warnings at the appropriate places, and deletes a large amount of code which was only used in code paths that did not support spilling.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#8831 from JoshRosen/remove-ability-to-disable-spilling.
When speculative execution is enabled, consider a scenario where the authorized committer of a particular output partition fails during the OutputCommitter.commitTask() call. In this case, the OutputCommitCoordinator is supposed to release that committer's exclusive lock on committing once that task fails. However, due to a unit mismatch (we used task attempt number in one place and task attempt id in another) the lock will not be released, causing Spark to go into an infinite retry loop.
This bug was masked by the fact that the OutputCommitCoordinator does not have enough end-to-end tests (the current tests use many mocks). Other factors contributing to this bug are the fact that we have many similarly-named identifiers that have different semantics but the same data types (e.g. attemptNumber and taskAttemptId, with inconsistent variable naming which makes them difficult to distinguish).
This patch adds a regression test and fixes this bug by always using task attempt numbers throughout this code.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#8544 from JoshRosen/SPARK-10381.
*Note: this is for master branch only.* The fix for branch-1.5 is at #8721.
The query execution ID is currently passed from a thread to its children, which is not the intended behavior. This led to `IllegalArgumentException: spark.sql.execution.id is already set` when running queries in parallel, e.g.:
```
(1 to 100).par.foreach { _ =>
sc.parallelize(1 to 5).map { i => (i, i) }.toDF("a", "b").count()
}
```
The cause is `SparkContext`'s local properties are inherited by default. This patch adds a way to exclude keys we don't want to be inherited, and makes SQL go through that code path.
Author: Andrew Or <andrew@databricks.com>
Closes#8710 from andrewor14/concurrent-sql-executions.
This patch adds support for submitting map stages in a DAG individually so that we can make downstream decisions after seeing statistics about their output, as part of SPARK-9850. I also added more comments to many of the key classes in DAGScheduler. By itself, the patch is not super useful except maybe to switch between a shuffle and broadcast join, but with the other subtasks of SPARK-9850 we'll be able to do more interesting decisions.
The main entry point is SparkContext.submitMapStage, which lets you run a map stage and see stats about the map output sizes. Other stats could also be collected through accumulators. See AdaptiveSchedulingSuite for a short example.
Author: Matei Zaharia <matei@databricks.com>
Closes#8180 from mateiz/spark-9851.
This is a follow-up patch to #8723. I missed one case there.
Author: Andrew Or <andrew@databricks.com>
Closes#8727 from andrewor14/fix-threading-suite.
This is a followup to #8499 which adds a Scalastyle rule to mandate the use of SparkHadoopUtil's JobContext accessor methods and fixes the existing violations.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#8521 from JoshRosen/SPARK-10330-part2.
Fix a few Java API test style issues: unused generic types, exceptions, wrong assert argument order
Author: Sean Owen <sowen@cloudera.com>
Closes#8706 from srowen/SPARK-10547.
This commit ensures if an assertion fails within a thread, it will ultimately fail the test. Otherwise we end up potentially masking real bugs by not propagating assertion failures properly.
Author: Andrew Or <andrew@databricks.com>
Closes#8723 from andrewor14/fix-threading-suite.
ShuffleManager implementations are currently not given type information for
the key, value and combiner classes. Serialization of shuffle objects relies
on objects being JavaSerializable, with methods defined for reading/writing
the object or, alternatively, serialization via Kryo which uses reflection.
Serialization systems like Avro, Thrift and Protobuf generate classes with
zero argument constructors and explicit schema information
(e.g. IndexedRecords in Avro have get, put and getSchema methods).
By serializing the key, value and combiner class names in ShuffleDependency,
shuffle implementations will have access to schema information when
registerShuffle() is called.
Author: Matt Massie <massie@cs.berkeley.edu>
Closes#7403 from massie/shuffle-classtags.
This is a regression introduced in #4960, this commit fixes it and adds a test.
tnachen andrewor14 please review, this should be an easy one.
Author: Iulian Dragos <jaguarul@gmail.com>
Closes#8653 from dragos/issue/mesos/fine-grained-maxExecutorCores.
The architecture is that, in YARN mode, if the driver detects that an executor has disconnected, it asks the ApplicationMaster why the executor died. If the ApplicationMaster is aware that the executor died because of preemption, all tasks associated with that executor are not marked as failed. The executor
is still removed from the driver's list of available executors, however.
There's a few open questions:
1. Should standalone mode have a similar "get executor loss reason" as well? I localized this change as much as possible to affect only YARN, but there could be a valid case to differentiate executor losses in standalone mode as well.
2. I make a pretty strong assumption in YarnAllocator that getExecutorLossReason(executorId) will only be called once per executor id; I do this so that I can remove the metadata from the in-memory map to avoid object accumulation. It's not clear if I'm being overly zealous to save space, however.
cc vanzin specifically for review because it collided with some earlier YARN scheduling work.
cc JoshRosen because it's similar to output commit coordination we did in the past
cc andrewor14 for our discussion on how to get executor exit codes and loss reasons
Author: mcheah <mcheah@palantir.com>
Closes#8007 from mccheah/feature/preemption-handling.
We introduced the Netty network module for shuffle in Spark 1.2, and has turned it on by default for 3 releases. The old ConnectionManager is difficult to maintain. If we merge the patch now, by the time it is released, it would be 1 yr for which ConnectionManager is off by default. It's time to remove it.
Author: Reynold Xin <rxin@databricks.com>
Closes#8161 from rxin/SPARK-9767.
[SPARK-9591](https://issues.apache.org/jira/browse/SPARK-9591)
When we getting the broadcast variable, we can fetch the block form several location,but now when connecting the lost blockmanager(idle for enough time removed by driver when using dynamic resource allocate and so on) will cause task fail,and the worse case will cause the job fail.
Author: jeanlyn <jeanlyn92@gmail.com>
Closes#7927 from jeanlyn/catch_exception.
This contribution is my original work and I license the work to the project under the project's open source license.
Author: Pat Shields <yeoldefortran@gmail.com>
Closes#7979 from pashields/env-loading-on-driver.
This is pretty minor, just trying to improve the readability of `DAGSchedulerSuite`, I figure every bit helps. Before whenever I read this test, I never knew what "should work" and "should be ignored" really meant -- this adds some asserts & updates comments to make it more clear. Also some reformatting per a suggestion from markhamstra on https://github.com/apache/spark/pull/7699
Author: Imran Rashid <irashid@cloudera.com>
Closes#8434 from squito/SPARK-10247.
The ```Stage``` class now tracks whether there were a sufficient number of consecutive failures of that stage to trigger an abort.
To avoid an infinite loop of stage retries, we abort the job completely after 4 consecutive stage failures for one stage. We still allow more than 4 consecutive stage failures if there is an intervening successful attempt for the stage, so that in very long-lived applications, where a stage may get reused many times, we don't abort the job after failures that have been recovered from successfully.
I've added test cases to exercise the most obvious scenarios.
Author: Ilya Ganelin <ilya.ganelin@capitalone.com>
Closes#5636 from ilganeli/SPARK-5945.
SPARK-4223.
Currently we support setting view and modify acls but you have to specify a list of users. It would be nice to support * meaning all users have access.
Manual tests to verify that: "*" works for any user in:
a. Spark ui: view and kill stage. Done.
b. Spark history server. Done.
c. Yarn application killing. Done.
Author: zhuol <zhuol@yahoo-inc.com>
Closes#8398 from zhuoliu/4223.
In SMJ, the first ExternalSorter could consume all the memory before spilling, then the second can not even acquire the first page.
Before we have a better memory allocator, SMJ should call prepare() before call any compute() of it's children.
cc rxin JoshRosen
Author: Davies Liu <davies@databricks.com>
Closes#8511 from davies/smj_memory.
This change aims at speeding up the dev cycle a little bit, by making
sure that all tests behave the same w.r.t. where the code to be tested
is loaded from. Namely, that means that tests don't rely on the assembly
anymore, rather loading all needed classes from the build directories.
The main change is to make sure all build directories (classes and test-classes)
are added to the classpath of child processes when running tests.
YarnClusterSuite required some custom code since the executors are run
differently (i.e. not through the launcher library, like standalone and
Mesos do).
I also found a couple of tests that could leak a SparkContext on failure,
and added code to handle those.
With this patch, it's possible to run the following command from a clean
source directory and have all tests pass:
mvn -Pyarn -Phadoop-2.4 -Phive-thriftserver install
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#7629 from vanzin/SPARK-9284.
Replace `JavaConversions` implicits with `JavaConverters`
Most occurrences I've seen so far are necessary conversions; a few have been avoidable. None are in critical code as far as I see, yet.
Author: Sean Owen <sowen@cloudera.com>
Closes#8033 from srowen/SPARK-9613.
The peak execution memory metric was introduced in SPARK-8735. That was before Tungsten was enabled by default, so it assumed that `spark.sql.unsafe.enabled` must be explicitly set to true. The result is that the memory is not displayed by default.
Author: Andrew Or <andrew@databricks.com>
Closes#8345 from andrewor14/show-memory-default.
https://issues.apache.org/jira/browse/SPARK-9439
In general, Yarn apps should be robust to NodeManager restarts. However, if you run spark with the external shuffle service on, after a NM restart all shuffles fail, b/c the shuffle service has lost some state with info on each executor. (Note the shuffle data is perfectly fine on disk across a NM restart, the problem is we've lost the small bit of state that lets us *find* those files.)
The solution proposed here is that the external shuffle service can write out its state to leveldb (backed by a local file) every time an executor is added. When running with yarn, that file is in the NM's local dir. Whenever the service is started, it looks for that file, and if it exists, it reads the file and re-registers all executors there.
Nothing is changed in non-yarn modes with this patch. The service is not given a place to save the state to, so it operates the same as before. This should make it easy to update other cluster managers as well, by just supplying the right file & the equivalent of yarn's `initializeApplication` -- I'm not familiar enough with those modes to know how to do that.
Author: Imran Rashid <irashid@cloudera.com>
Closes#7943 from squito/leveldb_external_shuffle_service_NM_restart and squashes the following commits:
0d285d3 [Imran Rashid] review feedback
70951d6 [Imran Rashid] Merge branch 'master' into leveldb_external_shuffle_service_NM_restart
5c71c8c [Imran Rashid] save executor to db before registering; style
2499c8c [Imran Rashid] explicit dependency on jackson-annotations
795d28f [Imran Rashid] review feedback
81f80e2 [Imran Rashid] Merge branch 'master' into leveldb_external_shuffle_service_NM_restart
594d520 [Imran Rashid] use json to serialize application executor info
1a7980b [Imran Rashid] version
8267d2a [Imran Rashid] style
e9f99e8 [Imran Rashid] cleanup the handling of bad dbs a little
9378ba3 [Imran Rashid] fail gracefully on corrupt leveldb files
acedb62 [Imran Rashid] switch to writing out one record per executor
79922b7 [Imran Rashid] rely on yarn to call stopApplication; assorted cleanup
12b6a35 [Imran Rashid] save registered executors when apps are removed; add tests
c878fbe [Imran Rashid] better explanation of shuffle service port handling
694934c [Imran Rashid] only open leveldb connection once per service
d596410 [Imran Rashid] store executor data in leveldb
59800b7 [Imran Rashid] Files.move in case renaming is unsupported
32fe5ae [Imran Rashid] Merge branch 'master' into external_shuffle_service_NM_restart
d7450f0 [Imran Rashid] style
f729e2b [Imran Rashid] debugging
4492835 [Imran Rashid] lol, dont use a PrintWriter b/c of scalastyle checks
0a39b98 [Imran Rashid] Merge branch 'master' into external_shuffle_service_NM_restart
55f49fc [Imran Rashid] make sure the service doesnt die if the registered executor file is corrupt; add tests
245db19 [Imran Rashid] style
62586a6 [Imran Rashid] just serialize the whole executors map
bdbbf0d [Imran Rashid] comments, remove some unnecessary changes
857331a [Imran Rashid] better tests & comments
bb9d1e6 [Imran Rashid] formatting
bdc4b32 [Imran Rashid] rename
86e0cb9 [Imran Rashid] for tests, shuffle service finds an open port
23994ff [Imran Rashid] style
7504de8 [Imran Rashid] style
a36729c [Imran Rashid] cleanup
efb6195 [Imran Rashid] proper unit test, and no longer leak if apps stop during NM restart
dd93dc0 [Imran Rashid] test for shuffle service w/ NM restarts
d596969 [Imran Rashid] cleanup imports
0e9d69b [Imran Rashid] better names
9eae119 [Imran Rashid] cleanup lots of duplication
1136f44 [Imran Rashid] test needs to have an actual shuffle
0b588bd [Imran Rashid] more fixes ...
ad122ef [Imran Rashid] more fixes
5e5a7c3 [Imran Rashid] fix build
c69f46b [Imran Rashid] maybe working version, needs tests & cleanup ...
bb3ba49 [Imran Rashid] minor cleanup
36127d3 [Imran Rashid] wip
b9d2ced [Imran Rashid] incomplete setup for external shuffle service tests
In Scala, `Seq.fill` always seems to return a List. Accessing a list by index is an O(N) operation. Thus, the following code will be really slow (~10 seconds on my machine):
```scala
val numItems = 100000
val s = Seq.fill(numItems)(1)
for (i <- 0 until numItems) s(i)
```
It turns out that we had a loop like this in DAGScheduler code, although it's a little tricky to spot. In `getPreferredLocsInternal`, there's a call to `getCacheLocs(rdd)(partition)`. The `getCacheLocs` call returns a Seq. If this Seq is a List and the RDD contains many partitions, then indexing into this list will cost O(partitions). Thus, when we loop over our tasks to compute their individual preferred locations we implicitly perform an N^2 loop, reducing scheduling throughput.
This patch fixes this by replacing `Seq` with `Array`.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#8178 from JoshRosen/dagscheduler-perf.
The shuffle locality patch made the DAGScheduler aware of shuffle data,
but for RDDs that have both narrow and shuffle dependencies, it can
cause them to place tasks based on the shuffle dependency instead of the
narrow one. This case is common in iterative join-based algorithms like
PageRank and ALS, where one RDD is hash-partitioned and one isn't.
Author: Matei Zaharia <matei@databricks.com>
Closes#8220 from mateiz/shuffle-loc-fix.
In these tests, we use a custom listener and we assert on fields in the stage / task completion events. However, these events are posted in a separate thread so they're not guaranteed to be posted in time. This commit fixes this flakiness through a job end registration callback.
Author: Andrew Or <andrew@databricks.com>
Closes#8176 from andrewor14/fix-accumulator-suite.
I think that we should pass additional configuration flags to disable the driver UI and Master REST server in SparkSubmitSuite and HiveSparkSubmitSuite. This might cut down on port-contention-related flakiness in Jenkins.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#8124 from JoshRosen/disable-ui-in-sparksubmitsuite.
… allocation are set. Now, dynamic allocation is set to false when num-executors is explicitly specified as an argument. Consequently, executorAllocationManager in not initialized in the SparkContext.
Author: Niranjan Padmanabhan <niranjan.padmanabhan@cloudera.com>
Closes#7657 from neurons/SPARK-9092.
This is the sister patch to #8011, but for aggregation.
In a nutshell: create the `TungstenAggregationIterator` before computing the parent partition. Internally this creates a `BytesToBytesMap` which acquires a page in the constructor as of this patch. This ensures that the aggregation operator is not starved since we reserve at least 1 page in advance.
rxin yhuai
Author: Andrew Or <andrew@databricks.com>
Closes#8038 from andrewor14/unsafe-starve-memory-agg.
This is based on KaiXinXiaoLei's changes in #7716.
The issue is that when someone calls `sc.killExecutor("1")` on the same executor twice quickly, then the executor target will be adjusted downwards by 2 instead of 1 even though we're only actually killing one executor. In certain cases where we don't adjust the target back upwards quickly, we'll end up with jobs hanging.
This is a common danger because there are many places where this is called:
- `HeartbeatReceiver` kills an executor that has not been sending heartbeats
- `ExecutorAllocationManager` kills an executor that has been idle
- The user code might call this, which may interfere with the previous callers
While it's not clear whether this fixes SPARK-9745, fixing this potential race condition seems like a strict improvement. I've added a regression test to illustrate the issue.
Author: Andrew Or <andrew@databricks.com>
Closes#8078 from andrewor14/da-double-kill.
This allows clients to retrieve the original exception from the
cause field of the SparkException that is thrown by the driver.
If the original exception is not in fact Serializable then it will
not be returned, but the message and stacktrace will be. (All Java
Throwables implement the Serializable interface, but this is no
guarantee that a particular implementation can actually be
serialized.)
Author: Tom White <tom@cloudera.com>
Closes#7014 from tomwhite/propagate-user-exceptions.
`InternalAccumulator.create` doesn't call `registerAccumulatorForCleanup` to register itself with ContextCleaner, so `WeakReference`s for these accumulators in `Accumulators.originals` won't be removed.
This PR added `registerAccumulatorForCleanup` for internal accumulators to avoid the memory leak.
Author: zsxwing <zsxwing@gmail.com>
Closes#8108 from zsxwing/internal-accumulators-leak.
PlatformDependent.UNSAFE is way too verbose.
Author: Reynold Xin <rxin@databricks.com>
Closes#8094 from rxin/SPARK-9815 and squashes the following commits:
229b603 [Reynold Xin] [SPARK-9815] Rename PlatformDependent.UNSAFE -> Platform.
The issue only happens if `spark.executor.cores` is not set and executor memory is set to a high value.
For example, if we have a worker with 4G and 10 cores and we set `spark.executor.memory` to 3G, then only 1 core is assigned to the executor. The correct number should be 10 cores.
I've added a unit test to illustrate the issue.
Author: Carson Wang <carson.wang@intel.com>
Closes#8017 from carsonwang/SPARK-9731 and squashes the following commits:
d09ec48 [Carson Wang] Fix code style
86b651f [Carson Wang] Simplify the code
943cc4c [Carson Wang] fix scheduling correct cores to executors
The original code that this test tests is removed in 9270bd06fd. It was ignored shortly before that so we never caught it. This patch re-enables the test and adds the code necessary to make it pass.
JoshRosen yhuai
Author: Andrew Or <andrew@databricks.com>
Closes#8015 from andrewor14/SPARK-9674 and squashes the following commits:
225eac2 [Andrew Or] Merge branch 'master' of github.com:apache/spark into SPARK-9674
8c24209 [Andrew Or] Fix NPE
e541d64 [Andrew Or] Track aggregation memory for both sort and hash
0be3a42 [Andrew Or] Fix test
This patch follows exactly #7891 (except testing)
Author: Davies Liu <davies@databricks.com>
Closes#8005 from davies/larger_record and squashes the following commits:
f9c4aff [Davies Liu] address comments
9de5c72 [Davies Liu] support records larger than page size in UnsafeShuffleExternalSorter
Previously, we use 64MB as the default page size, which was way too big for a lot of Spark applications (especially for single node).
This patch changes it so that the default page size, if unset by the user, is determined by the number of cores available and the total execution memory available.
Author: Reynold Xin <rxin@databricks.com>
Closes#8012 from rxin/pagesize and squashes the following commits:
16f4756 [Reynold Xin] Fixed failing test.
5afd570 [Reynold Xin] private...
0d5fb98 [Reynold Xin] Update default value.
674a6cd [Reynold Xin] Address review feedback.
dc00e05 [Reynold Xin] Merge with master.
73ebdb6 [Reynold Xin] [SPARK-9700] Pick default page size more intelligently.
The issue is that a task may run multiple sorts, and the sorts run by the child operator (i.e. parent RDD) may acquire all available memory such that other sorts in the same task do not have enough to proceed. This manifests itself in an `IOException("Unable to acquire X bytes of memory")` thrown by `UnsafeExternalSorter`.
The solution is to reserve a page in each sorter in the chain before computing the child operator's (parent RDD's) partitions. This requires us to use a new special RDD that does some preparation before computing the parent's partitions.
Author: Andrew Or <andrew@databricks.com>
Closes#8011 from andrewor14/unsafe-starve-memory and squashes the following commits:
35b69a4 [Andrew Or] Simplify test
0b07782 [Andrew Or] Minor: update comments
5d5afdf [Andrew Or] Merge branch 'master' of github.com:apache/spark into unsafe-starve-memory
254032e [Andrew Or] Add tests
234acbd [Andrew Or] Reserve a page in sorter when preparing each partition
b889e08 [Andrew Or] MapPartitionsWithPreparationRDD
This pull request adds a destructive iterator to BytesToBytesMap. When used, the iterator frees pages as it traverses them. This is part of the effort to avoid starving when we have more than one operators that can exhaust memory.
This is based on #7924, but fixes a bug there (Don't use destructive iterator in UnsafeKVExternalSorter).
Closes#7924.
Author: Liang-Chi Hsieh <viirya@appier.com>
Author: Reynold Xin <rxin@databricks.com>
Closes#8003 from rxin/map-destructive-iterator and squashes the following commits:
6b618c3 [Reynold Xin] Don't use destructive iterator in UnsafeKVExternalSorter.
a7bd8ec [Reynold Xin] Merge remote-tracking branch 'viirya/destructive_iter' into map-destructive-iterator
7652083 [Liang-Chi Hsieh] For comments: add destructiveIterator(), modify unit test, remove code block.
4a3e9de [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into destructive_iter
581e9e3 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into destructive_iter
f0ff783 [Liang-Chi Hsieh] No need to free last page.
9e9d2a3 [Liang-Chi Hsieh] Add a destructive iterator for BytesToBytesMap.
First, it's probably a bad idea to call generated Scala methods
from Java. In this case, the method being called wasn't actually
"Utils.createTempDir()", but actually the method that returns the
first default argument to the actual createTempDir method, which
is just the location of java.io.tmpdir; meaning that all tests in
the class were using the same temp dir, and thus affecting each
other.
Second, spillingOccursInResponseToMemoryPressure was not writing
enough records to actually cause a spill.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#7970 from vanzin/SPARK-9651 and squashes the following commits:
74d357f [Marcelo Vanzin] Clean up temp dir on test tear down.
a64f36a [Marcelo Vanzin] [SPARK-9651] Fix UnsafeExternalSorterSuite.
```
Error Message
Failed to bind to: /127.0.0.1:7093: Service 'sparkMaster' failed after 16 retries!
Stacktrace
java.net.BindException: Failed to bind to: /127.0.0.1:7093: Service 'sparkMaster' failed after 16 retries!
at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272)
at akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:393)
at akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:389)
at scala.util.Success$$anonfun$map$1.apply(Try.scala:206)
at scala.util.Try$.apply(Try.scala:161)
```
Author: Andrew Or <andrew@databricks.com>
Closes#7968 from andrewor14/fix-master-flaky-test and squashes the following commits:
fcc42ef [Andrew Or] Randomize port
The current implementation of UnsafeExternalSort uses NoOpPrefixComparator for binary-typed data.
So, we need to add BinaryPrefixComparator in PrefixComparators.
Author: Takeshi YAMAMURO <linguin.m.s@gmail.com>
Closes#7676 from maropu/BinaryTypePrefixComparator and squashes the following commits:
fe6f31b [Takeshi YAMAMURO] Apply comments
d943c04 [Takeshi YAMAMURO] Add a codegen'd entry for BinaryType in SortPrefix
ecf3ac5 [Takeshi YAMAMURO] Support BinaryType in PrefixComparator
shivaram cafreeman Could you please help me in testing this out? Exposing and running `rPackageBuilder` from inside the shell works, but for some reason, I can't get it to work during Spark Submit. It just starts relaunching Spark Submit.
For testing, you may use the R branch with [sbt-spark-package](https://github.com/databricks/sbt-spark-package). You can call spPackage, and then pass the jar using `--jars`.
Author: Burak Yavuz <brkyvz@gmail.com>
Closes#7139 from brkyvz/r-submit and squashes the following commits:
0de384f [Burak Yavuz] remove unused imports 2
d253708 [Burak Yavuz] removed unused imports
6603d0d [Burak Yavuz] addressed comments
4258ffe [Burak Yavuz] merged master
ddfcc06 [Burak Yavuz] added zipping test
3a1be7d [Burak Yavuz] don't zip
77995df [Burak Yavuz] fix URI
ac45527 [Burak Yavuz] added zipping of all libs
e6bf7b0 [Burak Yavuz] add println ignores
1bc5554 [Burak Yavuz] add assumes for tests
9778e03 [Burak Yavuz] addressed comments
b42b300 [Burak Yavuz] merged master
ffd134e [Burak Yavuz] Merge branch 'master' of github.com:apache/spark into r-submit
d867756 [Burak Yavuz] add apache header
eff5ba1 [Burak Yavuz] ready for review
8838edb [Burak Yavuz] Merge branch 'master' of github.com:apache/spark into r-submit
e5b5a06 [Burak Yavuz] added doc
bb751ce [Burak Yavuz] fix null bug
0226768 [Burak Yavuz] fixed issues
8810beb [Burak Yavuz] R packages support
https://issues.apache.org/jira/browse/SPARK-9602
Although we have hidden Akka behind RPC interface, I found that the Akka/Actor-related comments are still spreading everywhere. To make it consistent, we shall remove "actor"/"akka" words from the comments...
Author: CodingCat <zhunansjtu@gmail.com>
Closes#7936 from CodingCat/SPARK-9602 and squashes the following commits:
e8296a3 [CodingCat] remove actor words from comments
This patch extends UnsafeExternalSorter to support records larger than the page size. The basic strategy is the same as in #7762: store large records in their own overflow pages.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#7891 from JoshRosen/large-records-in-sql-sorter and squashes the following commits:
967580b [Josh Rosen] Merge remote-tracking branch 'origin/master' into large-records-in-sql-sorter
948c344 [Josh Rosen] Add large records tests for KV sorter.
3c17288 [Josh Rosen] Combine memory and disk cleanup into general cleanupResources() method
380f217 [Josh Rosen] Merge remote-tracking branch 'origin/master' into large-records-in-sql-sorter
27eafa0 [Josh Rosen] Fix page size in PackedRecordPointerSuite
a49baef [Josh Rosen] Address initial round of review comments
3edb931 [Josh Rosen] Remove accidentally-committed debug statements.
2b164e2 [Josh Rosen] Support large records in UnsafeExternalSorter.
Enable most javac lint warnings; fix a lot of build warnings. In a few cases, touch up surrounding code in the process.
I'll explain several of the changes inline in comments.
Author: Sean Owen <sowen@cloudera.com>
Closes#7862 from srowen/SPARK-9534 and squashes the following commits:
ea51618 [Sean Owen] Enable most javac lint warnings; fix a lot of build warnings. In a few cases, touch up surrounding code in the process.
While the functionality is there to exclude packages, there are no flags that allow users to exclude dependencies, in case of dependency conflicts. We should provide users with a flag to add dependency exclusions in case the packages are not resolved properly (or not available due to licensing).
The flag I added was --packages-exclude, but I'm open on renaming it. I also added property flags in case people would like to use a conf file to provide dependencies, which is possible if there is a long list of dependencies or exclusions.
cc andrewor14 vanzin pwendell
Author: Burak Yavuz <brkyvz@gmail.com>
Closes#7599 from brkyvz/packages-exclusions and squashes the following commits:
636f410 [Burak Yavuz] addressed nits
6e54ede [Burak Yavuz] is this the culprit
b5e508e [Burak Yavuz] Merge branch 'master' of github.com:apache/spark into packages-exclusions
154f5db [Burak Yavuz] addressed initial comments
1536d7a [Burak Yavuz] Added flags to exclude packages using --packages-exclude
Certain use cases of Spark involve RDDs with long lineages that must be truncated periodically (e.g. GraphX). The existing way of doing it is through `rdd.checkpoint()`, which is expensive because it writes to HDFS. This patch provides an alternative to truncate lineages cheaply *without providing the same level of fault tolerance*.
**Local checkpointing** writes checkpointed data to the local file system through the block manager. It is much faster than replicating to a reliable storage and provides the same semantics as long as executors do not fail. It is accessible through a new operator `rdd.localCheckpoint()` and leaves the old one unchanged. Users may even decide to combine the two and call the reliable one less frequently.
The bulk of this patch involves refactoring the checkpointing interface to accept custom implementations of checkpointing. [Design doc](https://issues.apache.org/jira/secure/attachment/12741708/SPARK-7292-design.pdf).
Author: Andrew Or <andrew@databricks.com>
Closes#7279 from andrewor14/local-checkpoint and squashes the following commits:
729600f [Andrew Or] Oops, fix tests
34bc059 [Andrew Or] Avoid computing all partitions in local checkpoint
e43bbb6 [Andrew Or] Merge branch 'master' of github.com:apache/spark into local-checkpoint
3be5aea [Andrew Or] Address comments
bf846a6 [Andrew Or] Merge branch 'master' of github.com:apache/spark into local-checkpoint
ab003a3 [Andrew Or] Fix compile
c2e111b [Andrew Or] Address comments
33f167a [Andrew Or] Merge branch 'master' of github.com:apache/spark into local-checkpoint
e908a42 [Andrew Or] Fix tests
f5be0f3 [Andrew Or] Use MEMORY_AND_DISK as the default local checkpoint level
a92657d [Andrew Or] Update a few comments
e58e3e3 [Andrew Or] Merge branch 'master' of github.com:apache/spark into local-checkpoint
4eb6eb1 [Andrew Or] Merge branch 'master' of github.com:apache/spark into local-checkpoint
1bbe154 [Andrew Or] Simplify LocalCheckpointRDD
48a9996 [Andrew Or] Avoid traversing dependency tree + rewrite tests
62aba3f [Andrew Or] Merge branch 'master' of github.com:apache/spark into local-checkpoint
db70dc2 [Andrew Or] Express local checkpointing through caching the original RDD
87d43c6 [Andrew Or] Merge branch 'master' of github.com:apache/spark into local-checkpoint
c449b38 [Andrew Or] Fix style
4a182f3 [Andrew Or] Add fine-grained tests for local checkpointing
53b363b [Andrew Or] Rename a few more awkwardly named methods (minor)
e4cf071 [Andrew Or] Simplify LocalCheckpointRDD + docs + clean ups
4880deb [Andrew Or] Fix style
d096c67 [Andrew Or] Fix mima
172cb66 [Andrew Or] Fix mima?
e53d964 [Andrew Or] Fix style
56831c5 [Andrew Or] Add a few warnings and clear exception messages
2e59646 [Andrew Or] Add local checkpoint clean up tests
4dbbab1 [Andrew Or] Refactor CheckpointSuite to test local checkpointing
4514dc9 [Andrew Or] Clean local checkpoint files through RDD cleanups
0477eec [Andrew Or] Rename a few methods with awkward names (minor)
2e902e5 [Andrew Or] First implementation of local checkpointing
8447454 [Andrew Or] Fix tests
4ac1896 [Andrew Or] Refactor checkpoint interface for modularity
This patch builds directly on #7820, which is largely written by tnachen. The only addition is one commit for cleaning up the code. There should be no functional differences between this and #7820.
Author: Timothy Chen <tnachen@gmail.com>
Author: Andrew Or <andrew@databricks.com>
Closes#7881 from andrewor14/tim-cleanup-mesos-shuffle and squashes the following commits:
8894f7d [Andrew Or] Clean up code
2a5fa10 [Andrew Or] Merge branch 'mesos_shuffle_clean' of github.com:tnachen/spark into tim-cleanup-mesos-shuffle
fadff89 [Timothy Chen] Address comments.
e4d0f1d [Timothy Chen] Clean up external shuffle data on driver exit with Mesos.
This pull request adds a destructAndCreateExternalSorter method to UnsafeFixedWidthAggregationMap. The new method does the following:
1. Creates a new external sorter UnsafeKVExternalSorter
2. Adds all the data into an in-memory sorter, sorts them
3. Spills the sorted in-memory data to disk
This method can be used to fallback to sort-based aggregation when under memory pressure.
The pull request also includes accounting fixes from JoshRosen.
TODOs (that can be done in follow-up PRs)
- [x] Address Josh's feedbacks from #7849
- [x] More documentation and test cases
- [x] Make sure we are doing memory accounting correctly with test cases (e.g. did we release the memory in BytesToBytesMap twice?)
- [ ] Look harder at possible memory leaks and exception handling
- [ ] Randomized tester for the KV sorter as well as the aggregation map
Author: Reynold Xin <rxin@databricks.com>
Author: Josh Rosen <joshrosen@databricks.com>
Closes#7860 from rxin/kvsorter and squashes the following commits:
986a58c [Reynold Xin] Bug fix.
599317c [Reynold Xin] Style fix and slightly more compact code.
fe7bd4e [Reynold Xin] Bug fixes.
fd71bef [Reynold Xin] Merge remote-tracking branch 'josh/large-records-in-sql-sorter' into kvsorter-with-josh-fix
3efae38 [Reynold Xin] More fixes and documentation.
45f1b09 [Josh Rosen] Ensure that spill files are cleaned up
f6a9bd3 [Reynold Xin] Josh feedback.
9be8139 [Reynold Xin] Remove testSpillFrequency.
7cbe759 [Reynold Xin] [SPARK-9531][SQL] UnsafeFixedWidthAggregationMap.destructAndCreateExternalSorter.
ae4a8af [Josh Rosen] Detect leaked unsafe memory in UnsafeExternalSorterSuite.
52f9b06 [Josh Rosen] Detect ShuffleMemoryManager leaks in UnsafeExternalSorter.
Dynamic allocation is a feature that allows a Spark application to scale the number of executors up and down dynamically based on the workload. Support was first introduced in YARN since 1.2, and then extended to Mesos coarse-grained mode recently. Today, it is finally supported in standalone mode as well!
I tested this locally and it works as expected. This is WIP because unit tests are coming.
Author: Andrew Or <andrew@databricks.com>
Closes#7532 from andrewor14/standalone-da and squashes the following commits:
b3c1736 [Andrew Or] Merge branch 'master' of github.com:apache/spark into standalone-da
879e928 [Andrew Or] Add end-to-end tests for standalone dynamic allocation
accc8f6 [Andrew Or] Address comments
ee686a8 [Andrew Or] Merge branch 'master' of github.com:apache/spark into standalone-da
c0a2c02 [Andrew Or] Fix build after merge conflict
24149eb [Andrew Or] Merge branch 'master' of github.com:apache/spark into standalone-da
2e762d6 [Andrew Or] Merge branch 'master' of github.com:apache/spark into standalone-da
6832bd7 [Andrew Or] Add tests for scheduling with executor limit
a82e907 [Andrew Or] Fix comments
0a8be79 [Andrew Or] Simplify logic by removing the worker blacklist
b7742af [Andrew Or] Merge branch 'master' of github.com:apache/spark into standalone-da
2eb5f3f [Andrew Or] Merge branch 'master' of github.com:apache/spark into standalone-da
1334e9a [Andrew Or] Fix MiMa
32abe44 [Andrew Or] Fix style
58cb06f [Andrew Or] Privatize worker blacklist for cleanliness
42ac215 [Andrew Or] Clean up comments and rewrite code for readability
49702d1 [Andrew Or] Clean up shuffle files after application exits
80047aa [Andrew Or] First working implementation
BytesToBytesMap current encodes key/value data in the following format:
```
8B key length, key data, 8B value length, value data
```
UnsafeExternalSorter, on the other hand, encodes data this way:
```
4B record length, data
```
As a result, we cannot pass records encoded by BytesToBytesMap directly into UnsafeExternalSorter for sorting. However, if we rearrange data slightly, we can then pass the key/value records directly into UnsafeExternalSorter:
```
4B key+value length, 4B key length, key data, value data
```
Author: Reynold Xin <rxin@databricks.com>
Closes#7845 from rxin/kvsort-rebase and squashes the following commits:
5716b59 [Reynold Xin] Fixed test.
2e62ccb [Reynold Xin] Updated BytesToBytesMap's data encoding to put the key first.
a51b641 [Reynold Xin] Added a KV sorter interface.
This patch adds support for entries larger than the default page size in BytesToBytesMap. These large rows are handled by allocating special overflow pages to hold individual entries.
In addition, this patch integrates BytesToBytesMap with the ShuffleMemoryManager:
- Move BytesToBytesMap from `unsafe` to `core` so that it can import `ShuffleMemoryManager`.
- Before allocating new data pages, ask the ShuffleMemoryManager to reserve the memory:
- `putNewKey()` now returns a boolean to indicate whether the insert succeeded or failed due to a lack of memory. The caller can use this value to respond to the memory pressure (e.g. by spilling).
- `UnsafeFixedWidthAggregationMap. getAggregationBuffer()` now returns `null` to signal failure due to a lack of memory.
- Updated all uses of these classes to handle these error conditions.
- Added new tests for allocating large records and for allocations which fail due to memory pressure.
- Extended the `afterAll()` test teardown methods to detect ShuffleMemoryManager leaks.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#7762 from JoshRosen/large-rows and squashes the following commits:
ae7bc56 [Josh Rosen] Fix compilation
82fc657 [Josh Rosen] Merge remote-tracking branch 'origin/master' into large-rows
34ab943 [Josh Rosen] Remove semi
31a525a [Josh Rosen] Integrate BytesToBytesMap with ShuffleMemoryManager.
626b33c [Josh Rosen] Move code to sql/core and spark/core packages so that ShuffleMemoryManager can be integrated
ec4484c [Josh Rosen] Move BytesToBytesMap from unsafe package to core.
642ed69 [Josh Rosen] Rename size to numElements
bea1152 [Josh Rosen] Add basic test.
2cd3570 [Josh Rosen] Remove accidental duplicated code
07ff9ef [Josh Rosen] Basic support for large rows in BytesToBytesMap.
Author: Reynold Xin <rxin@databricks.com>
Closes#7803 from rxin/SPARK-9458 and squashes the following commits:
5b032dc [Reynold Xin] Fix string.
b670dbb [Reynold Xin] [SPARK-9458][SPARK-9469][SQL] Code generate prefix computation in sorting & moves unsafe conversion out of TungstenSort.