`DAGSchedulerEventLoop` normally only logs errors (so it can continue to process more events, from other jobs). However, this is not desirable in the tests -- the tests should be able to easily detect any exception, and also shouldn't silently succeed if there is an exception.
This was suggested by mateiz on https://github.com/apache/spark/pull/7699. It may have already turned up an issue in "zero split job".
Author: Imran Rashid <irashid@cloudera.com>
Closes#8466 from squito/SPARK-10248.
```
Exception in thread "main" org.apache.spark.rpc.RpcTimeoutException:
Cannot receive any reply in ${timeout.duration}. This timeout is controlled by spark.rpc.askTimeout
at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
```
Author: Andrew Or <andrew@databricks.com>
Closes#10334 from andrewor14/rpc-typo.
SPARK_HOME is now causing problem with Mesos cluster mode since spark-submit script has been changed recently to take precendence when running spark-class scripts to look in SPARK_HOME if it's defined.
We should skip passing SPARK_HOME from the Spark client in cluster mode with Mesos, since Mesos shouldn't use this configuration but should use spark.executor.home instead.
Author: Timothy Chen <tnachen@gmail.com>
Closes#10332 from tnachen/scheduler_ui.
This change builds the event history of completed apps asynchronously so the RPC thread will not be blocked and allow new workers to register/remove if the event log history is very large and takes a long time to rebuild.
Author: Bryan Cutler <bjcutler@us.ibm.com>
Closes#10284 from BryanCutler/async-MasterUI-SPARK-12062.
These changes rework the implementations of `SimpleFutureAction`, `ComplexFutureAction`, `JobWaiter`, and `AsyncRDDActions` such that asynchronous callbacks on the generated `Futures` NEVER block waiting for a job to complete. A small amount of mutex synchronization is necessary to protect the internal fields that manage cancellation, but these locks are only held very briefly and in practice should almost never cause any blocking to occur. The existing blocking APIs of these classes are retained, but they simply delegate to the underlying non-blocking API and `Await` the results with indefinite timeouts.
Associated JIRA ticket: https://issues.apache.org/jira/browse/SPARK-9026
Also fixes: https://issues.apache.org/jira/browse/SPARK-4514
This pull request contains all my own original work, which I release to the Spark project under its open source license.
Author: Richard W. Eggert II <richard.eggert@gmail.com>
Closes#9264 from reggert/fix-futureaction.
https://issues.apache.org/jira/browse/SPARK-9516
- [x] new look of Thread Dump Page
- [x] click column title to sort
- [x] grep
- [x] search as you type
squito JoshRosen It's ready for the review now
Author: CodingCat <zhunansjtu@gmail.com>
Closes#7910 from CodingCat/SPARK-9516.
Replace shuffleManagerClassName with shortShuffleMgrName is to reduce time of string's comparison. and put sort's comparison on the front. cc JoshRosen andrewor14
Author: Lianhui Wang <lianhuiwang09@gmail.com>
Closes#10131 from lianhuiwang/spark-12130.
1. Make sure workers and masters exit so that no worker or master will still be running when triggering the shutdown hook.
2. Set ExecutorState to FAILED if it's still RUNNING when executing the shutdown hook.
This should fix the potential exceptions when exiting a local cluster
```
java.lang.AssertionError: assertion failed: executor 4 state transfer from RUNNING to RUNNING is illegal
at scala.Predef$.assert(Predef.scala:179)
at org.apache.spark.deploy.master.Master$$anonfun$receive$1.applyOrElse(Master.scala:260)
at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:116)
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204)
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
java.lang.IllegalStateException: Shutdown hooks cannot be modified during shutdown.
at org.apache.spark.util.SparkShutdownHookManager.add(ShutdownHookManager.scala:246)
at org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:191)
at org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:180)
at org.apache.spark.deploy.worker.ExecutorRunner.start(ExecutorRunner.scala:73)
at org.apache.spark.deploy.worker.Worker$$anonfun$receive$1.applyOrElse(Worker.scala:474)
at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:116)
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204)
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
```
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#10269 from zsxwing/executor-state.
**Problem.** In unified memory management, acquiring execution memory may lead to eviction of storage memory. However, the space freed from evicting cached blocks is distributed among all active tasks. Thus, an incorrect upper bound on the execution memory per task can cause the acquisition to fail, leading to OOM's and premature spills.
**Example.** Suppose total memory is 1000B, cached blocks occupy 900B, `spark.memory.storageFraction` is 0.4, and there are two active tasks. In this case, the cap on task execution memory is 100B / 2 = 50B. If task A tries to acquire 200B, it will evict 100B of storage but can only acquire 50B because of the incorrect cap. For another example, see this [regression test](https://github.com/andrewor14/spark/blob/fix-oom/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala#L233) that I stole from JoshRosen.
**Solution.** Fix the cap on task execution memory. It should take into account the space that could have been freed by storage in addition to the current amount of memory available to execution. In the example above, the correct cap should have been 600B / 2 = 300B.
This patch also guards against the race condition (SPARK-12253):
(1) Existing tasks collectively occupy all execution memory
(2) New task comes in and blocks while existing tasks spill
(3) After tasks finish spilling, another task jumps in and puts in a large block, stealing the freed memory
(4) New task still cannot acquire memory and goes back to sleep
Author: Andrew Or <andrew@databricks.com>
Closes#10240 from andrewor14/fix-oom.
This patch adds documentation for Spark configurations that affect off-heap memory and makes some naming and validation improvements for those configs.
- Change `spark.memory.offHeapSize` to `spark.memory.offHeap.size`. This is fine because this configuration has not shipped in any Spark release yet (it's new in Spark 1.6).
- Deprecated `spark.unsafe.offHeap` in favor of a new `spark.memory.offHeap.enabled` configuration. The motivation behind this change is to gather all memory-related configurations under the same prefix.
- Add a check which prevents users from setting `spark.memory.offHeap.enabled=true` when `spark.memory.offHeap.size == 0`. After SPARK-11389 (#9344), which was committed in Spark 1.6, Spark enforces a hard limit on the amount of off-heap memory that it will allocate to tasks. As a result, enabling off-heap execution memory without setting `spark.memory.offHeap.size` will lead to immediate OOMs. The new configuration validation makes this scenario easier to diagnose, helping to avoid user confusion.
- Document these configurations on the configuration page.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#10237 from JoshRosen/SPARK-12251.
This avoids bringing up yet another HTTP server on the driver, and
instead reuses the file server already managed by the driver's
RpcEnv. As a bonus, the repl now inherits the security features of
the network library.
There's also a small change to create the directory for storing classes
under the root temp dir for the application (instead of directly
under java.io.tmpdir).
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#9923 from vanzin/SPARK-11563.
Don't warn when description isn't valid HTML since it may properly be like "SELECT ... where foo <= 1"
The tests for this code indicate that it's normal to handle strings like this that don't contain HTML as a string rather than markup. Hence logging every such instance as a warning is too noisy since it's not a problem. this is an issue for stages whose name contain SQL like the above
CC tdas as author of this bit of code
Author: Sean Owen <sowen@cloudera.com>
Closes#10159 from srowen/SPARK-11824.
This patch fixes a bug in the eviction of storage memory by execution.
## The bug:
In general, execution should be able to evict storage memory when the total storage memory usage is greater than `maxMemory * spark.memory.storageFraction`. Due to a bug, however, Spark might wind up evicting no storage memory in certain cases where the storage memory usage was between `maxMemory * spark.memory.storageFraction` and `maxMemory`. For example, here is a regression test which illustrates the bug:
```scala
val maxMemory = 1000L
val taskAttemptId = 0L
val (mm, ms) = makeThings(maxMemory)
// Since we used the default storage fraction (0.5), we should be able to allocate 500 bytes
// of storage memory which are immune to eviction by execution memory pressure.
// Acquire enough storage memory to exceed the storage region size
assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks))
assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.executionMemoryUsed === 0L)
assert(mm.storageMemoryUsed === 750L)
// At this point, storage is using 250 more bytes of memory than it is guaranteed, so execution
// should be able to reclaim up to 250 bytes of storage memory.
// Therefore, execution should now be able to require up to 500 bytes of memory:
assert(mm.acquireExecutionMemory(500L, taskAttemptId, MemoryMode.ON_HEAP) === 500L) // <--- fails by only returning 250L
assert(mm.storageMemoryUsed === 500L)
assert(mm.executionMemoryUsed === 500L)
assertEvictBlocksToFreeSpaceCalled(ms, 250L)
```
The problem relates to the control flow / interaction between `StorageMemoryPool.shrinkPoolToReclaimSpace()` and `MemoryStore.ensureFreeSpace()`. While trying to allocate the 500 bytes of execution memory, the `UnifiedMemoryManager` discovers that it will need to reclaim 250 bytes of memory from storage, so it calls `StorageMemoryPool.shrinkPoolToReclaimSpace(250L)`. This method, in turn, calls `MemoryStore.ensureFreeSpace(250L)`. However, `ensureFreeSpace()` first checks whether the requested space is less than `maxStorageMemory - storageMemoryUsed`, which will be true if there is any free execution memory because it turns out that `MemoryStore.maxStorageMemory = (maxMemory - onHeapExecutionMemoryPool.memoryUsed)` when the `UnifiedMemoryManager` is used.
The control flow here is somewhat confusing (it grew to be messy / confusing over time / as a result of the merging / refactoring of several components). In the pre-Spark 1.6 code, `ensureFreeSpace` was called directly by the `MemoryStore` itself, whereas in 1.6 it's involved in a confusing control flow where `MemoryStore` calls `MemoryManager.acquireStorageMemory`, which then calls back into `MemoryStore.ensureFreeSpace`, which, in turn, calls `MemoryManager.freeStorageMemory`.
## The solution:
The solution implemented in this patch is to remove the confusing circular control flow between `MemoryManager` and `MemoryStore`, making the storage memory acquisition process much more linear / straightforward. The key changes:
- Remove a layer of inheritance which made the memory manager code harder to understand (53841174760a24a0df3eb1562af1f33dbe340eb9).
- Move some bounds checks earlier in the call chain (13ba7ada77f87ef1ec362aec35c89a924e6987cb).
- Refactor `ensureFreeSpace()` so that the part which evicts blocks can be called independently from the part which checks whether there is enough free space to avoid eviction (7c68ca09cb1b12f157400866983f753ac863380e).
- Realize that this lets us remove a layer of overloads from `ensureFreeSpace` (eec4f6c87423d5e482b710e098486b3bbc4daf06).
- Realize that `ensureFreeSpace()` can simply be replaced with an `evictBlocksToFreeSpace()` method which is called [after we've already figured out](2dc842aea8/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala (L88)) how much memory needs to be reclaimed via eviction; (2dc842aea82c8895125d46a00aa43dfb0d121de9).
Along the way, I fixed some problems with the mocks in `MemoryManagerSuite`: the old mocks would [unconditionally](80a824d36e/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala (L84)) report that a block had been evicted even if there was enough space in the storage pool such that eviction would be avoided.
I also fixed a problem where `StorageMemoryPool._memoryUsed` might become negative due to freed memory being double-counted when excution evicts storage. The problem was that `StorageMemoryPoolshrinkPoolToFreeSpace` would [decrement `_memoryUsed`](7c68ca09cb (diff-935c68a9803be144ed7bafdd2f756a0fL133)) even though `StorageMemoryPool.freeMemory` had already decremented it as each evicted block was freed. See SPARK-12189 for details.
Author: Josh Rosen <joshrosen@databricks.com>
Author: Andrew Or <andrew@databricks.com>
Closes#10170 from JoshRosen/SPARK-12165.
Because of AM failure, the target executor number between driver and AM will be different, which will lead to unexpected behavior in dynamic allocation. So when AM is re-registered with driver, state in `ExecutorAllocationManager` and `CoarseGrainedSchedulerBacked` should be reset.
This issue is originally addressed in #8737 , here re-opened again. Thanks a lot KaiXinXiaoLei for finding this issue.
andrewor14 and vanzin would you please help to review this, thanks a lot.
Author: jerryshao <sshao@hortonworks.com>
Closes#9963 from jerryshao/SPARK-10582.
SPARK-12060 fixed JavaSerializerInstance.serialize
This PR applies the same technique on two other classes.
zsxwing
Author: tedyu <yuzhihong@gmail.com>
Closes#10177 from tedyu/master.
The json endpoint for stages doesn't include information on the stage duration that is present in the UI. This looks like a simple oversight, they should be included. eg., the metrics should be included at api/v1/applications/<appId>/stages.
Metrics I've added are: submissionTime, firstTaskLaunchedTime and completionTime
Author: Xin Ren <iamshrek@126.com>
Closes#10107 from keypointt/SPARK-11155.
Merged #10051 again since #10083 is resolved.
This reverts commit 328b757d5d.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#10167 from zsxwing/merge-SPARK-12060.
`ByteBuffer` doesn't guarantee all contents in `ByteBuffer.array` are valid. E.g, a ByteBuffer returned by `ByteBuffer.slice`. We should not use the whole content of `ByteBuffer` unless we know that's correct.
This patch fixed all places that use `ByteBuffer.array` incorrectly.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#10083 from zsxwing/bytebuffer-array.
Using Dynamic Allocation function, when a new AM is starting, and ExecutorAllocationManager send RequestExecutor message to AM. If the container allocator is not ready, the whole app will hang on
Author: meiyoula <1039320815@qq.com>
Closes#10138 from XuTingjun/patch-1.
We should upgrade to SBT 0.13.9, since this is a requirement in order to use SBT's new Maven-style resolution features (which will be done in a separate patch, because it's blocked by some binary compatibility issues in the POM reader plugin).
I also upgraded Scalastyle to version 0.8.0, which was necessary in order to fix a Scala 2.10.5 compatibility issue (see https://github.com/scalastyle/scalastyle/issues/156). The newer Scalastyle is slightly stricter about whitespace surrounding tokens, so I fixed the new style violations.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#10112 from JoshRosen/upgrade-to-sbt-0.13.9.
This replaces https://github.com/apache/spark/pull/9696
Invoke Checkstyle and print any errors to the console, failing the step.
Use Google's style rules modified according to
https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide
Some important checks are disabled (see TODOs in `checkstyle.xml`) due to
multiple violations being present in the codebase.
Suggest fixing those TODOs in a separate PR(s).
More on Checkstyle can be found on the [official website](http://checkstyle.sourceforge.net/).
Sample output (from [build 46345](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/46345/consoleFull)) (duplicated because I run the build twice with different profiles):
> Checkstyle checks failed at following occurrences:
[ERROR] src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java:[217,7] (coding) MissingSwitchDefault: switch without "default" clause.
> [ERROR] src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java:[198,10] (modifier) ModifierOrder: 'protected' modifier out of order with the JLS suggestions.
> [ERROR] src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java:[217,7] (coding) MissingSwitchDefault: switch without "default" clause.
> [ERROR] src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java:[198,10] (modifier) ModifierOrder: 'protected' modifier out of order with the JLS suggestions.
> [error] running /home/jenkins/workspace/SparkPullRequestBuilder2/dev/lint-java ; received return code 1
Also fix some of the minor violations that didn't require sweeping changes.
Apologies for the previous botched PRs - I finally figured out the issue.
cr: JoshRosen, pwendell
> I state that the contribution is my original work, and I license the work to the project under the project's open source license.
Author: Dmitry Erastov <derastov@gmail.com>
Closes#9867 from dskrvk/master.
When the spillable sort iterator was spilled, it was mistakenly keeping
the last page in memory rather than the current page. This causes the
current record to get corrupted.
Author: Nong <nong@cloudera.com>
Closes#10142 from nongli/spark-12089.
Resubmit #9297 and #9991
On the live web UI, there is a SQL tab which provides valuable information for the SQL query. But once the workload is finished, we won't see the SQL tab on the history server. It will be helpful if we support SQL UI on the history server so we can analyze it even after its execution.
To support SQL UI on the history server:
1. I added an onOtherEvent method to the SparkListener trait and post all SQL related events to the same event bus.
2. Two SQL events SparkListenerSQLExecutionStart and SparkListenerSQLExecutionEnd are defined in the sql module.
3. The new SQL events are written to event log using Jackson.
4. A new trait SparkHistoryListenerFactory is added to allow the history server to feed events to the SQL history listener. The SQL implementation is loaded at runtime using java.util.ServiceLoader.
Author: Carson Wang <carson.wang@intel.com>
Closes#10061 from carsonwang/SqlHistoryUI.
TaskAttemptContext's constructor will clone the configuration instead of referencing it. Calling setConf after creating TaskAttemptContext makes any changes to the configuration made inside setConf unperceived by RecordReader instances.
As an example, Titan's InputFormat will change conf when calling setConf. They wrap their InputFormat around Cassandra's ColumnFamilyInputFormat, and append Cassandra's configuration. This change fixes the following error when using Titan's CassandraInputFormat with Spark:
*java.lang.RuntimeException: org.apache.thrift.protocol.TProtocolException: Required field 'keyspace' was not present! Struct: set_key space_args(keyspace:null)*
There's a discussion of this error here: https://groups.google.com/forum/#!topic/aureliusgraphs/4zpwyrYbGAE
Author: Anderson de Andrade <adeandrade@verticalscope.com>
Closes#10046 from adeandrade/newhadooprdd-fix.
**Problem.** Event logs in 1.6 were much bigger than 1.5. I ran page rank and the event log size in 1.6 was almost 5x that in 1.5. I did a bisect to find that the RDD callsite added in #9398 is largely responsible for this.
**Solution.** This patch removes the long form of the callsite (which is not used!) from the event log. This reduces the size of the event log significantly.
*Note on compatibility*: if this patch is to be merged into 1.6.0, then it won't break any compatibility. Otherwise, if it is merged into 1.6.1, then we might need to add more backward compatibility handling logic (currently does not exist yet).
Author: Andrew Or <andrew@databricks.com>
Closes#10115 from andrewor14/smaller-event-logs.
`SynchronousQueue` cannot cache any task. This issue is similar to #9978. It's an easy fix. Just use the fixed `ThreadUtils.newDaemonCachedThreadPool`.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#10108 from zsxwing/fix-threadpool.
Downgrade to warning log for unexpected state transition.
andrewor14 please review, thanks a lot.
Author: jerryshao <sshao@hortonworks.com>
Closes#10091 from jerryshao/SPARK-12059.
This is purely the yarn/src/main and yarn/src/test bits of the YARN ATS integration: the extension model to load and run implementations of `SchedulerExtensionService` in the yarn cluster scheduler process —and to stop them afterwards.
There's duplication between the two schedulers, yarn-client and yarn-cluster, at least in terms of setting everything up, because the common superclass, `YarnSchedulerBackend` is in spark-core, and the extension services need the YARN app/attempt IDs.
If you look at how the the extension services are loaded, the case class `SchedulerExtensionServiceBinding` is used to pass in config info -currently just the spark context and the yarn IDs, of which one, the attemptID, will be null when running client-side. I'm passing in a case class to ensure that it would be possible in future to add extra arguments to the binding class, yet, as the method signature will not have changed, still be able to load existing services.
There's no functional extension service here, just one for testing. The real tests come in the bigger pull requests. At the same time, there's no restriction of this extension service purely to the ATS history publisher. Anything else that wants to listen to the spark context and publish events could use this, and I'd also consider writing one for the YARN-913 registry service, so that the URLs of the web UI would be locatable through that (low priority; would make more sense if integrated with a REST client).
There's no minicluster test. Given the test execution overhead of setting up minicluster tests, it'd probably be better to add an extension service into one of the existing tests.
Author: Steve Loughran <stevel@hortonworks.com>
Closes#9182 from steveloughran/stevel/feature/SPARK-1537-service.
I have tried to address all the comments in pull request https://github.com/apache/spark/pull/2447.
Note that the second commit (using the new method in all internal code of all components) is quite intrusive and could be omitted.
Author: Jeroen Schot <jeroen.schot@surfsara.nl>
Closes#9767 from schot/master.
The existing `spark.memory.fraction` (default 0.75) gives the system 25% of the space to work with. For small heaps, this is not enough: e.g. default 1GB leaves only 250MB system memory. This is especially a problem in local mode, where the driver and executor are crammed in the same JVM. Members of the community have reported driver OOM's in such cases.
**New proposal.** We now reserve 300MB before taking the 75%. For 1GB JVMs, this leaves `(1024 - 300) * 0.75 = 543MB` for execution and storage. This is proposal (1) listed in the [JIRA](https://issues.apache.org/jira/browse/SPARK-12081).
Author: Andrew Or <andrew@databricks.com>
Closes#10081 from andrewor14/unified-memory-small-heaps.
Garbage collection triggers cleanups. If the driver JVM is huge and there is little memory pressure, we may never clean up shuffle files on executors. This is a problem for long-running applications (e.g. streaming).
Author: Andrew Or <andrew@databricks.com>
Closes#10070 from andrewor14/periodic-gc.
The solution is the save the RDD partitioner in a separate file in the RDD checkpoint directory. That is, `<checkpoint dir>/_partitioner`. In most cases, whether the RDD partitioner was recovered or not, does not affect the correctness, only reduces performance. So this solution makes a best-effort attempt to save and recover the partitioner. If either fails, the checkpointing is not affected. This makes this patch safe and backward compatible.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#9983 from tdas/SPARK-12004.
`JavaSerializerInstance.serialize` uses `ByteArrayOutputStream.toByteArray` to get the serialized data. `ByteArrayOutputStream.toByteArray` needs to copy the content in the internal array to a new array. However, since the array will be converted to `ByteBuffer` at once, we can avoid the memory copy.
This PR added `ByteBufferOutputStream` to access the protected `buf` and convert it to a `ByteBuffer` directly.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#10051 from zsxwing/SPARK-12060.
Avoid potential deadlock with a user app's shutdown hook thread by more narrowly synchronizing access to 'hooks'
Author: Sean Owen <sowen@cloudera.com>
Closes#10042 from srowen/SPARK-12049.
This change seems large, but most of it is just replacing `byte[]`
with `ByteBuffer` and `new byte[]` with `ByteBuffer.allocate()`,
since it changes the network library's API.
The following are parts of the code that actually have meaningful
changes:
- The Message implementations were changed to inherit from a new
AbstractMessage that can optionally hold a reference to a body
(in the form of a ManagedBuffer); this is similar to how
ResponseWithBody worked before, except now it's not restricted
to just responses.
- The TransportFrameDecoder was pretty much rewritten to avoid
copies as much as possible; it doesn't rely on CompositeByteBuf
to accumulate incoming data anymore, since CompositeByteBuf
has issues when slices are retained. The code now is able to
create frames without having to resort to copying bytes except
for a few bytes (containing the frame length) in very rare cases.
- Some minor changes in the SASL layer to convert things back to
`byte[]` since the JDK SASL API operates on those.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#9987 from vanzin/SPARK-12007.
This reverts commit cc243a079b / PR #9297
I'm reverting this because it broke SQLListenerMemoryLeakSuite in the master Maven builds.
See #9991 for a discussion of why this broke the tests.
This PR improve the performance of CartesianProduct by caching the result of right plan.
After this patch, the query time of TPC-DS Q65 go down to 4 seconds from 28 minutes (420X faster).
cc nongli
Author: Davies Liu <davies@databricks.com>
Closes#9969 from davies/improve_cartesian.
Top is implemented in terms of takeOrdered, which already maintains the
order, so top should, too.
Author: Wieland Hoffmann <themineo@gmail.com>
Closes#10013 from mineo/top-order.
In the previous implementation, the driver needs to know the executor listening address to send the thread dump request. However, in Netty RPC, the executor doesn't listen to any port, so the executor thread dump feature is broken.
This patch makes the driver use the endpointRef stored in BlockManagerMasterEndpoint to send the thread dump request to fix it.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#9976 from zsxwing/executor-thread-dump.
In the previous codes, `newDaemonCachedThreadPool` uses `SynchronousQueue`, which is wrong. `SynchronousQueue` is an empty queue that cannot cache any task. This patch uses `LinkedBlockingQueue` to fix it along with other fixes to make sure `newDaemonCachedThreadPool` can use at most `maxThreadNumber` threads, and after that, cache tasks to `LinkedBlockingQueue`.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#9978 from zsxwing/cached-threadpool.
On the live web UI, there is a SQL tab which provides valuable information for the SQL query. But once the workload is finished, we won't see the SQL tab on the history server. It will be helpful if we support SQL UI on the history server so we can analyze it even after its execution.
To support SQL UI on the history server:
1. I added an `onOtherEvent` method to the `SparkListener` trait and post all SQL related events to the same event bus.
2. Two SQL events `SparkListenerSQLExecutionStart` and `SparkListenerSQLExecutionEnd` are defined in the sql module.
3. The new SQL events are written to event log using Jackson.
4. A new trait `SparkHistoryListenerFactory` is added to allow the history server to feed events to the SQL history listener. The SQL implementation is loaded at runtime using `java.util.ServiceLoader`.
Author: Carson Wang <carson.wang@intel.com>
Closes#9297 from carsonwang/SqlHistoryUI.
This change does a couple of different things to make sure that the RpcEnv-level
code and the network library agree about the status of outstanding RPCs.
For RPCs that do not expect a reply ("RpcEnv.send"), support for one way
messages (hello CORBA!) was added to the network layer. This is a
"fire and forget" message that does not require any state to be kept
by the TransportClient; as a result, the RpcEnv 'Ack' message is not needed
anymore.
For RPCs that do expect a reply ("RpcEnv.ask"), the network library now
returns the internal RPC id; if the RpcEnv layer decides to time out the
RPC before the network layer does, it now asks the TransportClient to
forget about the RPC, so that if the network-level timeout occurs, the
client is not killed.
As part of implementing the above, I cleaned up some of the code in the
netty rpc backend, removing types that were not necessary and factoring
out some common code. Of interest is a slight change in the exceptions
when posting messages to a stopped RpcEnv; that's mostly to avoid nasty
error messages from the local-cluster backend when shutting down, which
pollutes the terminal output.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#9917 from vanzin/SPARK-11866.
`ExecutorAdded` can only be sent to `AppClient` when worker report back the executor state as `LOADING`, otherwise because of concurrency issue, `AppClient` will possibly receive `ExectuorAdded` at first, then `ExecutorStateUpdated` with `LOADING` state.
Also Master will change the executor state from `LAUNCHING` to `RUNNING` (`AppClient` report back the state as `RUNNING`), then to `LOADING` (worker report back to state as `LOADING`), it should be `LAUNCHING` -> `LOADING` -> `RUNNING`.
Also it is wrongly shown in master UI, the state of executor should be `RUNNING` rather than `LOADING`:
![screen shot 2015-09-11 at 2 30 28 pm](https://cloud.githubusercontent.com/assets/850797/9809254/3155d840-5899-11e5-8cdf-ad06fef75762.png)
Author: jerryshao <sshao@hortonworks.com>
Closes#8714 from jerryshao/SPARK-10558.
Currently the Web UI navbar has a minimum width of 1200px; so if a window is resized smaller than that the app name goes off screen. The 1200px width seems to have been chosen since it fits the longest example app name without wrapping.
To work with smaller window widths I made the tabs wrap since it looked better than wrapping the app name. This is a distinct change in how the navbar looks and I'm not sure if it's what we actually want to do.
Other notes:
- min-width set to 600px to keep the tabs from wrapping individually (will need to be adjusted if tabs are added)
- app name will also wrap (making three levels) if a really really long app name is used
Author: Alex Bozarth <ajbozart@us.ibm.com>
Closes#9874 from ajbozarth/spark10864.
deleting the temp dir like that
```
scala> import scala.collection.mutable
import scala.collection.mutable
scala> val a = mutable.Set(1,2,3,4,7,0,8,98,9)
a: scala.collection.mutable.Set[Int] = Set(0, 9, 1, 2, 3, 7, 4, 8, 98)
scala> a.foreach(x => {a.remove(x) })
scala> a.foreach(println(_))
98
```
You may not modify a collection while traversing or iterating over it.This can not delete all element of the collection
Author: Zhongshuai Pei <peizhongshuai@huawei.com>
Closes#9951 from DoingDone9/Bug_RemainDir.
- NettyRpcEnv::openStream() now correctly propagates errors to
the read side of the pipe.
- NettyStreamManager now throws if the file being transferred does
not exist.
- The network library now correctly handles zero-sized streams.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#9941 from vanzin/SPARK-11956.
This issue was addressed in https://github.com/apache/spark/pull/5494, but the fix in that PR, while safe in the sense that it will prevent the SparkContext from shutting down, misses the actual bug. The intent of `submitMissingTasks` should be understood as "submit the Tasks that are missing for the Stage, and run them as part of the ActiveJob identified by jobId". Because of a long-standing bug, the `jobId` parameter was never being used. Instead, we were trying to use the jobId with which the Stage was created -- which may no longer exist as an ActiveJob, hence the crash reported in SPARK-6880.
The correct fix is to use the ActiveJob specified by the supplied jobId parameter, which is guaranteed to exist at the call sites of submitMissingTasks.
This fix should be applied to all maintenance branches, since it has existed since 1.0.
kayousterhout pankajarora12
Author: Mark Hamstra <markhamstra@gmail.com>
Author: Imran Rashid <irashid@cloudera.com>
Closes#6291 from markhamstra/SPARK-6880.
After calling spill() on SortedIterator, the array inside InMemorySorter is not needed, it should be freed during spilling, this could help to join multiple tables with limited memory.
Author: Davies Liu <davies@databricks.com>
Closes#9793 from davies/free_array.
In the default Spark distribution, there are currently two separate
log4j config files, with different default values for the root logger,
so that when running the shell you have a different default log level.
This makes the shell more usable, since the logs don't overwhelm the
output.
But if you install a custom log4j.properties, you lose that, because
then it's going to be used no matter whether you're running a regular
app or the shell.
With this change, the overriding of the log level is done differently;
the log level repl's main class (org.apache.spark.repl.Main) is used
to define the root logger's level when running the shell, defaulting
to WARN if it's not set explicitly.
On a somewhat related change, the shell output about the "sc" variable
was changed a bit to contain a little more useful information about
the application, since when the root logger's log level is WARN, that
information is never shown to the user.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#9816 from vanzin/shell-logging.
Currently pivot's signature looks like
```scala
scala.annotation.varargs
def pivot(pivotColumn: Column, values: Column*): GroupedData
scala.annotation.varargs
def pivot(pivotColumn: String, values: Any*): GroupedData
```
I think we can remove the one that takes "Column" types, since callers should always be passing in literals. It'd also be more clear if the values are not varargs, but rather Seq or java.util.List.
I also made similar changes for Python.
Author: Reynold Xin <rxin@databricks.com>
Closes#9929 from rxin/SPARK-11946.
This is continuation of SPARK-11761
Andrew suggested adding this protection. See tail of https://github.com/apache/spark/pull/9741
Author: tedyu <yuzhihong@gmail.com>
Closes#9852 from tedyu/master.
Based on feedback from Matei, this is more consistent with mapPartitions in Spark.
Also addresses some of the cleanups from a previous commit that renames the type variables.
Author: Reynold Xin <rxin@databricks.com>
Closes#9919 from rxin/SPARK-11933.
This change abstracts the code that serves jars / files to executors so that
each RpcEnv can have its own implementation; the akka version uses the existing
HTTP-based file serving mechanism, while the netty versions uses the new
stream support added to the network lib, which makes file transfers benefit
from the easier security configuration of the network library, and should also
reduce overhead overall.
The change includes a small fix to TransportChannelHandler so that it propagates
user events to downstream handlers.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#9530 from vanzin/SPARK-11140.
1. Renamed map to mapGroup, flatMap to flatMapGroup.
2. Renamed asKey -> keyAs.
3. Added more documentation.
4. Changed type parameter T to V on GroupedDataset.
5. Added since versions for all functions.
Author: Reynold Xin <rxin@databricks.com>
Closes#9880 from rxin/SPARK-11899.
This mainly moves SqlNewHadoopRDD to the sql package. There is some state that is
shared between core and I've left that in core. This allows some other associated
minor cleanup.
Author: Nong Li <nong@databricks.com>
Closes#9845 from nongli/spark-11787.
Don't log ERROR messages when executors are explicitly killed or when
the exit reason is not yet known.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#9780 from vanzin/SPARK-11789.
…xceptions are thrown during executor shutdown
This commit will make sure that when uncaught exceptions are prepended with [Container in shutdown] when JVM is shutting down.
Author: Srinivasa Reddy Vundela <vsr@cloudera.com>
Closes#9809 from vundela/master_11799.
This PR includes the following change:
1. Bind NettyRpcEnv to the specified host
2. Fix the port information in the log for NettyRpcEnv.
3. Fix the service name of NettyRpcEnv.
Author: zsxwing <zsxwing@gmail.com>
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#9821 from zsxwing/SPARK-11830.
This patch adds an alternate to the Parquet RecordReader from the parquet-mr project
that is much faster for flat schemas. Instead of using the general converter mechanism
from parquet-mr, this directly uses the lower level APIs from parquet-columnar and a
customer RecordReader that directly assembles into UnsafeRows.
This is optionally disabled and only used for supported schemas.
Using the tpcds store sales table and doing a sum of increasingly more columns, the results
are:
For 1 Column:
Before: 11.3M rows/second
After: 18.2M rows/second
For 2 Columns:
Before: 7.2M rows/second
After: 11.2M rows/second
For 5 Columns:
Before: 2.9M rows/second
After: 4.5M rows/second
Author: Nong Li <nong@databricks.com>
Closes#9774 from nongli/parquet.
The HP Fortify Opens Source Review team (https://www.hpfod.com/open-source-review-project) reported a handful of potential resource leaks that were discovered using their static analysis tool. We should fix the issues identified by their scan.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#9455 from JoshRosen/fix-potential-resource-leaks.
[SPARK-6028](https://issues.apache.org/jira/browse/SPARK-6028) uses network module to implement RPC. However, there are some configurations named with `spark.shuffle` prefix in the network module.
This PR refactors them to make sure the user can control them in shuffle and RPC separately. The user can use `spark.rpc.*` to set the configuration for netty RPC.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#9481 from zsxwing/SPARK-10745.
Based on my conversions with people, I believe the consensus is that the coarse-grained mode is more stable and easier to reason about. It is best to use that as the default rather than the more flaky fine-grained mode.
Author: Reynold Xin <rxin@databricks.com>
Closes#9795 from rxin/SPARK-11809.
Currently streaming foreachRDD Java API uses a function prototype requiring a return value of null. This PR deprecates the old method and uses VoidFunction to allow for more concise declaration. Also added VoidFunction2 to Java API in order to use in Streaming methods. Unit test is added for using foreachRDD with VoidFunction, and changes have been tested with Java 7 and Java 8 using lambdas.
Author: Bryan Cutler <bjcutler@us.ibm.com>
Closes#9488 from BryanCutler/foreachRDD-VoidFunction-SPARK-4557.
https://issues.apache.org/jira/browse/SPARK-11792
The main changes include:
* Renaming `SizeEstimation` to `KnownSizeEstimation`. Hopefully this new name has more information.
* Making `estimatedSize` return `Long` instead of `Option[Long]`.
* In `UnsaveHashedRelation`, `estimatedSize` will delegate the work to `SizeEstimator` if we have not created a `BytesToBytesMap`.
Since we will put `UnsaveHashedRelation` to `BlockManager`, it is generally good to let it provide a more accurate size estimation. Also, if we do not put `BytesToBytesMap` directly into `BlockerManager`, I feel it is not really necessary to make `BytesToBytesMap` extends `KnownSizeEstimation`.
Author: Yin Huai <yhuai@databricks.com>
Closes#9813 from yhuai/SPARK-11792-followup.
Make sure we are using the context classloader when deserializing failed TaskResults instead of the Spark classloader.
The issue is that `enqueueFailedTask` was using the incorrect classloader which results in `ClassNotFoundException`.
Adds a test in TaskResultGetterSuite that compiles a custom exception, throws it on the executor, and asserts that Spark handles the TaskResult deserialization instead of returning `UnknownReason`.
See #9367 for previous comments
See SPARK-11195 for a full repro
Author: Hurshal Patel <hpatel516@gmail.com>
Closes#9779 from choochootrain/spark-11195-master.
See discussion toward the tail of https://github.com/apache/spark/pull/9723
From zsxwing :
```
The user should not call stop or other long-time work in a listener since it will block the listener thread, and prevent from stopping SparkContext/StreamingContext.
I cannot see an approach since we need to stop the listener bus's thread before stopping SparkContext/StreamingContext totally.
```
Proposed solution is to prevent the call to StreamingContext#stop() in the listener bus's thread.
Author: tedyu <yuzhihong@gmail.com>
Closes#9741 from tedyu/master.
This PR upgrade the version of RoaringBitmap to 0.5.10, to optimize the memory layout, will be much smaller when most of blocks are empty.
This PR is based on #9661 (fix conflicts), see all of the comments at https://github.com/apache/spark/pull/9661 .
Author: Kent Yao <yaooqinn@hotmail.com>
Author: Davies Liu <davies@databricks.com>
Author: Charles Allen <charles@allen-net.com>
Closes#9746 from davies/roaring_mapstatus.
Fix the serialization of RoaringBitmap with Kyro serializer
This PR came from https://github.com/metamx/spark/pull/1, thanks to drcrallen
Author: Davies Liu <davies@databricks.com>
Author: Charles Allen <charles@allen-net.com>
Closes#9748 from davies/SPARK-11016.
By using the dynamic allocation, sometimes it occurs false killing for those busy executors. Some executors with assignments will be killed because of being idle for enough time (say 60 seconds). The root cause is that the Task-Launch listener event is asynchronized.
For example, some executors are under assigning tasks, but not sending out the listener notification yet. Meanwhile, the dynamic allocation's executor idle time is up (e.g., 60 seconds). It will trigger killExecutor event at the same time.
1. the timer expiration starts before the listener event arrives.
2. Then, the task is going to run on top of that killed/killing executor. It will lead to task failure finally.
Here is the proposal to fix it. We can add the force control for killExecutor. If the force control is not set (i.e., false), we'd better to check if the executor under killing is idle or busy. If the current executor has some assignment, we should not kill that executor and return back false (to indicate killing failure). In dynamic allocation, we'd better to turn off force killing (i.e., force = false), we will meet killing failure if tries to kill a busy executor. And then, the executor timer won't be invalid. Later on, the task assignment event arrives, we can remove the idle timer accordingly. So that we can avoid false killing for those busy executors in dynamic allocation.
For the rest of usages, the end users can decide if to use force killing or not by themselves. If to turn on that option, the killExecutor will do the action without any status checking.
Author: Grace <jie.huang@intel.com>
Author: Andrew Or <andrew@databricks.com>
Author: Jie Huang <jie.huang@intel.com>
Closes#7888 from GraceH/forcekill.
There events happen normally during the app's lifecycle, so printing
out ERROR logs all the time is misleading, and can actually affect usability
of interactive shells.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#9772 from vanzin/SPARK-11786.
Set s3a credentials when creating a new default hadoop configuration.
Author: Chris Bannister <chris.bannister@swiftkey.com>
Closes#9663 from Zariel/set-s3a-creds.
Currently if dynamic allocation is enabled, explicitly killing executor will not get response, so the executor metadata is wrong in driver side. Which will make dynamic allocation on Yarn fail to work.
The problem is `disableExecutor` returns false for pending killing executors when `onDisconnect` is detected, so no further implementation is done.
One solution is to bypass these explicitly killed executors to use `super.onDisconnect` to remove executor. This is simple.
Another solution is still querying the loss reason for these explicitly kill executors. Since executor may get killed and informed in the same AM-RM communication, so current way of adding pending loss reason request is not worked (container complete is already processed), here we should store this loss reason for later query.
Here this PR chooses solution 2.
Please help to review. vanzin I think this part is changed by you previously, would you please help to review? Thanks a lot.
Author: jerryshao <sshao@hortonworks.com>
Closes#9684 from jerryshao/SPARK-11718.
When computing partition for non-parquet relation, `HadoopRDD.compute` is used. but it does not set the thread local variable `inputFileName` in `NewSqlHadoopRDD`, like `NewSqlHadoopRDD.compute` does.. Yet, when getting the `inputFileName`, `NewSqlHadoopRDD.inputFileName` is exptected, which is empty now.
Adding the setting inputFileName in HadoopRDD.compute resolves this issue.
Author: xin Wu <xinwu@us.ibm.com>
Closes#9542 from xwu0226/SPARK-11522.
The basic idea is that:
The archive of the SparkR package itself, that is sparkr.zip, is created during build process and is contained in the Spark binary distribution. No change to it after the distribution is installed as the directory it resides ($SPARK_HOME/R/lib) may not be writable.
When there is R source code contained in jars or Spark packages specified with "--jars" or "--packages" command line option, a temporary directory is created by calling Utils.createTempDir() where the R packages built from the R source code will be installed. The temporary directory is writable, and won't interfere with each other when there are multiple SparkR sessions, and will be deleted when this SparkR session ends. The R binary packages installed in the temporary directory then are packed into an archive named rpkg.zip.
sparkr.zip and rpkg.zip are distributed to the cluster in YARN modes.
The distribution of rpkg.zip in Standalone modes is not supported in this PR, and will be address in another PR.
Various R files are updated to accept multiple lib paths (one is for SparkR package, the other is for other R packages) so that these package can be accessed in R.
Author: Sun Rui <rui.sun@intel.com>
Closes#9390 from sun-rui/SPARK-10500.
On driver process start up, UserGroupInformation.loginUserFromKeytab is called with the principal and keytab passed in, and therefore static var UserGroupInfomation,loginUser is set to that principal with kerberos credentials saved in its private credential set, and all threads within the driver process are supposed to see and use this login credentials to authenticate with Hive and Hadoop. However, because of IsolatedClientLoader, UserGroupInformation class is not shared for hive metastore clients, and instead it is loaded separately and of course not able to see the prepared kerberos login credentials in the main thread.
The first proposed fix would cause other classloader conflict errors, and is not an appropriate solution. This new change does kerberos login during hive client initialization, which will make credentials ready for the particular hive client instance.
yhuai Please take a look and let me know. If you are not the right person to talk to, could you point me to someone responsible for this?
Author: Yu Gao <ygao@us.ibm.com>
Author: gaoyu <gaoyu@gaoyu-macbookpro.roam.corp.google.com>
Author: Yu Gao <crystalgaoyu@gmail.com>
Closes#9272 from yolandagao/master.
Also introduces new spark private API in RDD.scala with name 'mapPartitionsInternal' which doesn't closure cleans the RDD elements.
Author: nitin goyal <nitin.goyal@guavus.com>
Author: nitin.goyal <nitin.goyal@guavus.com>
Closes#9253 from nitin2goyal/master.
Currently, all the shuffle writer will write to target path directly, the file could be corrupted by other attempt of the same partition on the same executor. They should write to temporary file then rename to target path, as what we do in output committer. In order to make the rename atomic, the temporary file should be created in the same local directory (FileSystem).
This PR is based on #9214 , thanks to squito . Closes#9214
Author: Davies Liu <davies@databricks.com>
Closes#9610 from davies/safe_shuffle.
TODO
- [x] Add Java API
- [x] Add API tests
- [x] Add a function test
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#9636 from zsxwing/java-track.
This helps debug issues caused by multiple SparkContext instances. JoshRosen andrewor14
~~~
scala> sc.stop()
scala> sc.parallelize(0 until 10)
java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:
org.apache.spark.SparkContext.<init>(SparkContext.scala:82)
org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:1017)
$iwC$$iwC.<init>(<console>:9)
$iwC.<init>(<console>:18)
<init>(<console>:20)
.<init>(<console>:24)
.<clinit>(<console>)
.<init>(<console>:7)
.<clinit>(<console>)
$print(<console>)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:606)
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
The active context was created at:
(No active SparkContext.)
~~~
Author: Xiangrui Meng <meng@databricks.com>
Closes#9675 from mengxr/SPARK-11709.
The stop() callback was trying to close the launcher connection in the
same thread that handles connection data, which ended up causing a
deadlock. So avoid that by dispatching the stop() request in its own
thread.
On top of that, add some exception safety to a few parts of the code,
and use "destroyForcibly" from Java 8 if it's available, to force
kill the child process. The flip side is that "kill()" may not actually
work if running Java 7.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#9633 from vanzin/SPARK-11655.
This is a followup for #9317 to replace volatile fields with AtomicBoolean and AtomicReference.
Author: Reynold Xin <rxin@databricks.com>
Closes#9611 from rxin/SPARK-10827.
This patch modifies Spark's closure cleaner (and a few other places) to use ASM 5, which is necessary in order to support cleaning of closures that were compiled by Java 8.
In order to avoid ASM dependency conflicts, Spark excludes ASM from all of its dependencies and uses a shaded version of ASM 4 that comes from `reflectasm` (see [SPARK-782](https://issues.apache.org/jira/browse/SPARK-782) and #232). This patch updates Spark to use a shaded version of ASM 5.0.4 that was published by the Apache XBean project; the POM used to create the shaded artifact can be found at https://github.com/apache/geronimo-xbean/blob/xbean-4.4/xbean-asm5-shaded/pom.xml.
http://movingfulcrum.tumblr.com/post/80826553604/asm-framework-50-the-missing-migration-guide was a useful resource while upgrading the code to use the new ASM5 opcodes.
I also added a new regression tests in the `java8-tests` subproject; the existing tests were insufficient to catch this bug, which only affected Scala 2.11 user code which was compiled targeting Java 8.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#9512 from JoshRosen/SPARK-6152.
If it returns Text, we can reuse this in Spark SQL to provide a WholeTextFile data source and directly convert the Text into UTF8String without extra string decoding and encoding.
Author: Reynold Xin <rxin@databricks.com>
Closes#9622 from rxin/SPARK-11646.
Currently, when a DStream sets the scope for RDD generated by it, that scope is not allowed to be overridden by the RDD operations. So in case of `DStream.foreachRDD`, all the RDDs generated inside the foreachRDD get the same scope - `foreachRDD <time>`, as set by the `ForeachDStream`. So it is hard to debug generated RDDs in the RDD DAG viz in the Spark UI.
This patch allows the RDD operations inside `DStream.transform` and `DStream.foreachRDD` to append their own scopes to the earlier DStream scope.
I have also slightly tweaked how callsites are set such that the short callsite reflects the RDD operation name and line number. This tweak is necessary as callsites are not managed through scopes (which support nesting and overriding) and I didnt want to add another local property to control nesting and overriding of callsites.
## Before:
![image](https://cloud.githubusercontent.com/assets/663212/10808548/fa71c0c4-7da9-11e5-9af0-5737793a146f.png)
## After:
![image](https://cloud.githubusercontent.com/assets/663212/10808659/37bc45b6-7dab-11e5-8041-c20be6a9bc26.png)
The code that was used to generate this is:
```
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.foreachRDD { rdd =>
val temp = rdd.map { _ -> 1 }.reduceByKey( _ + _)
val temp2 = temp.map { _ -> 1}.reduceByKey(_ + _)
val count = temp2.count
println(count)
}
```
Note
- The inner scopes of the RDD operations map/reduceByKey inside foreachRDD is visible
- The short callsites of stages refers to the line number of the RDD ops rather than the same line number of foreachRDD in all three cases.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#9315 from tdas/SPARK-11361.
See http://search-hadoop.com/m/q3RTtjpe8r1iRbTj2 for discussion.
Summary: addition of VisibleForTesting annotation resulted in spark-shell malfunctioning.
Author: tedyu <yuzhihong@gmail.com>
Closes#9585 from tedyu/master.
As vonnagy reported in the following thread:
http://search-hadoop.com/m/q3RTtk982kvIow22
Attempts to join the thread in AsynchronousListenerBus resulted in lock up because AsynchronousListenerBus thread was still getting messages `SparkListenerExecutorMetricsUpdate` from the DAGScheduler
Author: tedyu <yuzhihong@gmail.com>
Closes#9546 from ted-yu/master.
Changed AppClient to be non-blocking in `receiveAndReply` by using a separate thread to wait for response and reply to the context. The threads are managed by a thread pool. Also added unit tests for the AppClient interface.
Author: Bryan Cutler <bjcutler@us.ibm.com>
Closes#9317 from BryanCutler/appClient-receiveAndReply-SPARK-10827.
with yarn's external shuffle, ExternalShuffleClient of executors reserve its connections for yarn's NodeManager until application has been completed. so it will make NodeManager and executors have many socket connections.
in order to reduce network pressure of NodeManager's shuffleService, after registerWithShuffleServer or fetchBlocks have been completed in ExternalShuffleClient, connection for NM's shuffleService needs to be closed.andrewor14 rxin vanzin
Author: Lianhui Wang <lianhuiwang09@gmail.com>
Closes#9227 from lianhuiwang/spark-11252.
this change rejects offers for slaves with unmet constraints for 120s to mitigate offer starvation.
this prevents mesos to send us these offers again and again.
in return, we get more offers for slaves which might meet our constraints.
and it enables mesos to send the rejected offers to other frameworks.
Author: Felix Bechstein <felix.bechstein@otto.de>
Closes#8639 from felixb/decline_offers_constraint_mismatch.
As shown in https://amplab.cs.berkeley.edu/jenkins/view/Spark-QA-Compile/job/Spark-Master-Scala211-Compile/1946/console , compilation fails with:
```
[error] /home/jenkins/workspace/Spark-Master-Scala211-Compile/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala:25: in class RDDInfo, multiple overloaded alternatives of constructor RDDInfo define default arguments.
[error] class RDDInfo(
[error]
```
This PR tries to fix the compilation error
Author: tedyu <yuzhihong@gmail.com>
Closes#9538 from tedyu/master.
A few changes:
1. Removed fold, since it can be confusing for distributed collections.
2. Created specific interfaces for each Dataset function (e.g. MapFunction, ReduceFunction, MapPartitionsFunction)
3. Added more documentation and test cases.
The other thing I'm considering doing is to have a "collector" interface for FlatMapFunction and MapPartitionsFunction, similar to MapReduce's map function.
Author: Reynold Xin <rxin@databricks.com>
Closes#9531 from rxin/SPARK-11564.
In order to lay the groundwork for proper off-heap memory support in SQL / Tungsten, we need to extend our MemoryManager to perform bookkeeping for off-heap memory.
## User-facing changes
This PR introduces a new configuration, `spark.memory.offHeapSize` (name subject to change), which specifies the absolute amount of off-heap memory that Spark and Spark SQL can use. If Tungsten is configured to use off-heap execution memory for allocating data pages, then all data page allocations must fit within this size limit.
## Internals changes
This PR contains a lot of internal refactoring of the MemoryManager. The key change at the heart of this patch is the introduction of a `MemoryPool` class (name subject to change) to manage the bookkeeping for a particular category of memory (storage, on-heap execution, and off-heap execution). These MemoryPools are not fixed-size; they can be dynamically grown and shrunk according to the MemoryManager's policies. In StaticMemoryManager, these pools have fixed sizes, proportional to the legacy `[storage|shuffle].memoryFraction`. In the new UnifiedMemoryManager, the sizes of these pools are dynamically adjusted according to its policies.
There are two subclasses of `MemoryPool`: `StorageMemoryPool` manages storage memory and `ExecutionMemoryPool` manages execution memory. The MemoryManager creates two execution pools, one for on-heap memory and one for off-heap. Instances of `ExecutionMemoryPool` manage the logic for fair sharing of their pooled memory across running tasks (in other words, the ShuffleMemoryManager-like logic has been moved out of MemoryManager and pushed into these ExecutionMemoryPool instances).
I think that this design is substantially easier to understand and reason about than the previous design, where most of these responsibilities were handled by MemoryManager and its subclasses. To see this, take at look at how simple the logic in `UnifiedMemoryManager` has become: it's now very easy to see when memory is dynamically shifted between storage and execution.
## TODOs
- [x] Fix handful of test failures in the MemoryManagerSuites.
- [x] Fix remaining TODO comments in code.
- [ ] Document new configuration.
- [x] Fix commented-out tests / asserts:
- [x] UnifiedMemoryManagerSuite.
- [x] Write tests that exercise the new off-heap memory management policies.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#9344 from JoshRosen/offheap-memory-accounting.
https://issues.apache.org/jira/browse/SPARK-10116
This is really trivial, just happened to notice it -- if `XORShiftRandom.hashSeed` is really supposed to have random bits throughout (as the comment implies), it needs to do something for the conversion to `long`.
mengxr mkolod
Author: Imran Rashid <irashid@cloudera.com>
Closes#8314 from squito/SPARK-10116.
This brings the support of off-heap memory for array inside BytesToBytesMap and InMemorySorter, then we could allocate all the memory from off-heap for execution.
Closes#8068
Author: Davies Liu <davies@databricks.com>
Closes#9477 from davies/unsafe_timsort.
Use the proxyBase set by the AM, if not found then use env. This is to fix the issue if somebody accidentally set APPLICATION_WEB_PROXY_BASE to wrong proxyBase
Author: Srinivasa Reddy Vundela <vsr@cloudera.com>
Closes#9448 from vundela/master.
spark.rpc is supposed to be configurable but is not currently (doesn't get propagated to executors because RpcEnv.create is done before driver properties are fetched).
Author: Nishkam Ravi <nishkamravi@gmail.com>
Closes#9460 from nishkamravi2/master_akka.
```PortableDataStream``` maintains some internal state. This makes it tricky to reuse a stream (one needs to call ```close``` on both the ```PortableDataStream``` and the ```InputStream``` it produces).
This PR removes all state from ```PortableDataStream``` and effectively turns it into an ```InputStream```/```Array[Byte]``` factory. This makes the user responsible for managing the ```InputStream``` it returns.
cc srowen
Author: Herman van Hovell <hvanhovell@questtec.nl>
Closes#9417 from hvanhovell/SPARK-11449.
After aggregation, the dataset could be smaller than inputs, so it's better to do hash based aggregation for all inputs, then using sort based aggregation to merge them.
Author: Davies Liu <davies@databricks.com>
Closes#9383 from davies/fix_switch.
OutputCommitCoordinator uses a map in a place where an array would suffice, increasing its memory consumption for result stages with millions of tasks.
This patch replaces that map with an array. The only tricky part of this is reasoning about the range of possible array indexes in order to make sure that we never index out of bounds.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#9274 from JoshRosen/SPARK-11307.
Since we have 4 bytes as number of records in the beginning of a page, the address can not be zero, so we do not need the bitset.
For performance concerns, the bitset could help speed up false lookup if the slot is empty (because bitset is smaller than longArray, cache hit rate will be higher). In practice, the map is filled with 35% - 70% (use 50% as average), so only half of the false lookups can benefit of it, all others will pay the cost of load the bitset (still need to access the longArray anyway).
For aggregation, we always need to access the longArray (insert a new key after false lookup), also confirmed by a benchmark.
For broadcast hash join, there could be a regression, but a simple benchmark showed that it may not (most of lookup are false):
```
sqlContext.range(1<<20).write.parquet("small")
df = sqlContext.read.parquet('small')
for i in range(3):
t = time.time()
df2 = sqlContext.range(1<<26).selectExpr("id * 1111111111 % 987654321 as id2")
df2.join(df, df.id == df2.id2).count()
print time.time() -t
```
Having bitset (used time in seconds):
```
17.5404241085
10.2758829594
10.5786800385
```
After removing bitset (used time in seconds):
```
21.8939979076
12.4132959843
9.97224712372
```
cc rxin nongli
Author: Davies Liu <davies@databricks.com>
Closes#9452 from davies/remove_bitset.
This is an updated version of #8995 by a-roberts. Original description follows:
Snappy now supports concatenation of serialized streams, this patch contains a version number change and the "does not support" test is now a "supports" test.
Snappy 1.1.2 changelog mentions:
> snappy-java-1.1.2 (22 September 2015)
> This is a backward compatible release for 1.1.x.
> Add AIX (32-bit) support.
> There is no upgrade for the native libraries of the other platforms.
> A major change since 1.1.1 is a support for reading concatenated results of SnappyOutputStream(s)
> snappy-java-1.1.2-RC2 (18 May 2015)
> Fix#107: SnappyOutputStream.close() is not idempotent
> snappy-java-1.1.2-RC1 (13 May 2015)
> SnappyInputStream now supports reading concatenated compressed results of SnappyOutputStream
> There has been no compressed format change since 1.0.5.x. So You can read the compressed results > interchangeablly between these versions.
> Fixes a problem when java.io.tmpdir does not exist.
Closes#8995.
Author: Adam Roberts <aroberts@uk.ibm.com>
Author: Josh Rosen <joshrosen@databricks.com>
Closes#9439 from JoshRosen/update-snappy.
functions.scala was getting pretty long. I broke it into multiple files.
I also added explicit data types for some public vals, and renamed aggregate function pretty names to lower case, which is more consistent with rest of the functions.
Author: Reynold Xin <rxin@databricks.com>
Closes#9471 from rxin/SPARK-11505.
In YARN mode, when preemption is enabled, we may leave executors in a
zombie state while we wait to retrieve the reason for which the executor
exited. This is so that we don't account for failed tasks that were
running on a preempted executor.
The issue is that while we wait for this information, the scheduler
might decide to schedule tasks on the executor, which will never be
able to run them. Other side effects include the block manager still
considering the executor available to cache blocks, for example.
So, when we know that an executor went down but we don't know why,
stop everything related to the executor, except its running tasks.
Only when we know the reason for the exit (or give up waiting for
it) we do update the running tasks.
This is achieved by a new `disableExecutor()` method in the
`Schedulable` interface. For managers that do not behave like this
(i.e. every one but YARN), the existing `executorLost()` method
will behave the same way it did before.
On top of that change, a few minor changes that made debugging easier,
and fixed some other minor issues:
- The cluster-mode AM was printing a misleading log message every
time an executor disconnected from the driver (because the akka
actor system was shared between driver and AM).
- Avoid sending unnecessary requests for an executor's exit reason
when we already know it was explicitly disabled / killed. This
avoids both multiple requests, and unnecessary requests that would
just cause warning messages on the AM (in the explicit kill case).
- Tone down a log message about the executor being lost when it
exited normally (e.g. preemption)
- Wake up the AM monitor thread when requests for executor loss
reasons arrive too, so that we can more quickly remove executors
from this zombie state.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#8887 from vanzin/SPARK-10622.
The test functionality should be the same, but without using mockito; logs don't
really say anything useful but I suspect it may be the cause of the flakiness,
since updating mocks when multiple threads may be using it doesn't work very
well. It also allows some other cleanup (= less test code in FsHistoryProvider).
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#9425 from vanzin/SPARK-11466.
DriverDescription refactored to case class because it included no mutable fields.
ApplicationDescription had one mutable field, which was appUiUrl. This field was set by the driver to point to the driver web UI. Master was modifying this field when the application was removed to redirect requests to history server. This was wrong because objects which are sent over the wire should be immutable. Now appUiUrl is immutable in ApplicationDescription and always points to the driver UI even if it is already shutdown. The UI url which master exposes to the user and modifies dynamically is now included into ApplicationInfo - a data object which describes the application state internally in master. That URL in ApplicationInfo is initialised with the value from ApplicationDescription.
ApplicationDescription also included value user, which is now a part of case class fields.
Author: Jacek Lewandowski <lewandowski.jacek@gmail.com>
Closes#9299 from jacek-lewandowski/SPARK-11344.
"Client mode" means the RPC env will not listen for incoming connections.
This allows certain processes in the Spark stack (such as Executors or
tha YARN client-mode AM) to act as pure clients when using the netty-based
RPC backend, reducing the number of sockets needed by the app and also the
number of open ports.
Client connections are also preferred when endpoints that actually have
a listening socket are involved; so, for example, if a Worker connects
to a Master and the Master needs to send a message to a Worker endpoint,
that client connection will be used, even though the Worker is also
listening for incoming connections.
With this change, the workaround for SPARK-10987 isn't necessary anymore, and
is removed. The AM connects to the driver in "client mode", and that connection
is used for all driver <-> AM communication, and so the AM is properly notified
when the connection goes down.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#9210 from vanzin/SPARK-10997.
JIRA: https://issues.apache.org/jira/browse/SPARK-11271
As reported in the JIRA ticket, when there are too many tasks, the memory usage of MapStatus will cause problem. Use BitSet instead of RoaringBitMap should be more efficient in memory usage.
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#9243 from viirya/mapstatus-bitset.
Use standard JDK APIs for that (with a little help from Guava). Most of the
changes here are in test code, since there were no tests specific to that
part of the code.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#9257 from vanzin/SPARK-11073.
Large HDFS clusters may take a while to leave safe mode when starting; this change
makes the HS wait for that before doing checks about its configuraton. This means
the HS won't stop right away if HDFS is in safe mode and the configuration is not
correct, but that should be a very uncommon situation.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#9043 from vanzin/SPARK-11020.
[SPARK-11338: HistoryPage not multi-tenancy enabled ...](https://issues.apache.org/jira/browse/SPARK-11338)
- `HistoryPage.scala` ...prepending all page links with the web proxy (`uiRoot`) path
- `HistoryServerSuite.scala` ...adding a test case to verify all site-relative links are prefixed when the environment variable `APPLICATION_WEB_PROXY_BASE` (or System property `spark.ui.proxyBase`) is set
Author: Christian Kadner <ckadner@us.ibm.com>
Closes#9291 from ckadner/SPARK-11338 and squashes the following commits:
01d2f35 [Christian Kadner] [SPARK-11338][WebUI] nit fixes
d054bd7 [Christian Kadner] [SPARK-11338][WebUI] prependBaseUri in method makePageLink
8bcb3dc [Christian Kadner] [SPARK-11338][WebUI] Prepend application links on HistoryPage with uiRoot path
**TL;DR**: We can rule out one rare but potential cause of input stream corruption via defensive programming.
## Background
[MAPREDUCE-5918](https://issues.apache.org/jira/browse/MAPREDUCE-5918) is a bug where an instance of a decompressor ends up getting placed into a pool multiple times. Since the pool is backed by a list instead of a set, this can lead to the same decompressor being used in different places at the same time, which is not safe because those decompressors will overwrite each other's buffers. Sometimes this buffer sharing will lead to exceptions but other times it will might silently result in invalid / garbled input.
That Hadoop bug is fixed in Hadoop 2.7 but is still present in many Hadoop versions that we wish to support. As a result, I think that we should try to work around this issue in Spark via defensive programming to prevent RecordReaders from being closed multiple times.
So far, I've had a hard time coming up with explanations of exactly how double-`close()`s occur in practice, but I do have a couple of explanations that work on paper.
For instance, it looks like https://github.com/apache/spark/pull/7424, added in 1.5, introduces at least one extremely~rare corner-case path where Spark could double-close() a LineRecordReader instance in a way that triggers the bug. Here are the steps involved in the bad execution that I brainstormed up:
* [The task has finished reading input, so we call close()](https://github.com/apache/spark/blob/v1.5.1/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L168).
* [While handling the close call and trying to close the reader, reader.close() throws an exception]( https://github.com/apache/spark/blob/v1.5.1/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L190)
* We don't set `reader = null` after handling this exception, so the [TaskCompletionListener also ends up calling NewHadoopRDD.close()](https://github.com/apache/spark/blob/v1.5.1/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L156), which, in turn, closes the record reader again.
In this hypothetical situation, `LineRecordReader.close()` could [fail with an exception if its InputStream failed to close](https://github.com/apache/hadoop/blob/release-1.2.1/src/mapred/org/apache/hadoop/mapred/LineRecordReader.java#L212).
I googled for "Exception in RecordReader.close()" and it looks like it's possible for a closed Hadoop FileSystem to trigger an error there: [SPARK-757](https://issues.apache.org/jira/browse/SPARK-757), [SPARK-2491](https://issues.apache.org/jira/browse/SPARK-2491)
Looking at [SPARK-3052](https://issues.apache.org/jira/browse/SPARK-3052), it seems like it's possible to get spurious exceptions there when there is an error reading from Hadoop. If the Hadoop FileSystem were to get into an error state _right_ after reading the last record then it looks like we could hit the bug here in 1.5.
## The fix
This patch guards against these issues by modifying `HadoopRDD.close()` and `NewHadoopRDD.close()` so that they set `reader = null` even if an exception occurs in the `reader.close()` call. In addition, I modified `NextIterator. closeIfNeeded()` to guard against double-close if the first `close()` call throws an exception.
I don't have an easy way to test this, since I haven't been able to reproduce the bug that prompted this patch, but these changes seem safe and seem to rule out the on-paper reproductions that I was able to brainstorm up.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#9382 from JoshRosen/hadoop-decompressor-pooling-fix and squashes the following commits:
5ec97d7 [Josh Rosen] Add SqlNewHadoopRDD.unsetInputFileName() that I accidentally deleted.
ae46cf4 [Josh Rosen] Merge remote-tracking branch 'origin/master' into hadoop-decompressor-pooling-fix
087aa63 [Josh Rosen] Guard against double-close() of RecordReaders.
Since we do not need to preserve a page before calling compute(), MapPartitionsWithPreparationRDD is not needed anymore.
This PR basically revert #8543, #8511, #8038, #8011
Author: Davies Liu <davies@databricks.com>
Closes#9381 from davies/remove_prepare2.
See [SPARK-10986](https://issues.apache.org/jira/browse/SPARK-10986) for details.
This fixes the `ClassNotFoundException` for Spark classes in the serializer.
I am not sure this is the right way to handle the class loader, but I couldn't find any documentation on how the context class loader is used and who relies on it. It seems at least the serializer uses it to instantiate classes during deserialization.
I am open to suggestions (I tried this fix on a real Mesos cluster and it *does* fix the issue).
tnachen andrewor14
Author: Iulian Dragos <jaguarul@gmail.com>
Closes#9282 from dragos/issue/mesos-classloader.
This PR introduce a mechanism to call spill() on those SQL operators that support spilling (for example, BytesToBytesMap, UnsafeExternalSorter and ShuffleExternalSorter) if there is not enough memory for execution. The preserved first page is needed anymore, so removed.
Other Spillable objects in Spark core (ExternalSorter and AppendOnlyMap) are not included in this PR, but those could benefit from this (trigger others' spilling).
The PrepareRDD may be not needed anymore, could be removed in follow up PR.
The following script will fail with OOM before this PR, finished in 150 seconds with 2G heap (also works in 1.5 branch, with similar duration).
```python
sqlContext.setConf("spark.sql.shuffle.partitions", "1")
df = sqlContext.range(1<<25).selectExpr("id", "repeat(id, 2) as s")
df2 = df.select(df.id.alias('id2'), df.s.alias('s2'))
j = df.join(df2, df.id==df2.id2).groupBy(df.id).max("id", "id2")
j.explain()
print j.count()
```
For thread-safety, here what I'm got:
1) Without calling spill(), the operators should only be used by single thread, no safety problems.
2) spill() could be triggered in two cases, triggered by itself, or by other operators. we can check trigger == this in spill(), so it's still in the same thread, so safety problems.
3) if it's triggered by other operators (right now cache will not trigger spill()), we only spill the data into disk when it's in scanning stage (building is finished), so the in-memory sorter or memory pages are read-only, we only need to synchronize the iterator and change it.
4) During scanning, the iterator will only use one record in one page, we can't free this page, because the downstream is currently using it (used by UnsafeRow or other objects). In BytesToBytesMap, we just skip the current page, and dump all others into disk. In UnsafeExternalSorter, we keep the page that is used by current record (having the same baseObject), free it when loading the next record. In ShuffleExternalSorter, the spill() will not trigger during scanning.
5) In order to avoid deadlock, we didn't call acquireMemory during spill (so we reused the pointer array in InMemorySorter).
Author: Davies Liu <davies@databricks.com>
Closes#9241 from davies/force_spill.