**TL;DR**: We can rule out one rare but potential cause of input stream corruption via defensive programming.
## Background
[MAPREDUCE-5918](https://issues.apache.org/jira/browse/MAPREDUCE-5918) is a bug where an instance of a decompressor ends up getting placed into a pool multiple times. Since the pool is backed by a list instead of a set, this can lead to the same decompressor being used in different places at the same time, which is not safe because those decompressors will overwrite each other's buffers. Sometimes this buffer sharing will lead to exceptions but other times it will might silently result in invalid / garbled input.
That Hadoop bug is fixed in Hadoop 2.7 but is still present in many Hadoop versions that we wish to support. As a result, I think that we should try to work around this issue in Spark via defensive programming to prevent RecordReaders from being closed multiple times.
So far, I've had a hard time coming up with explanations of exactly how double-`close()`s occur in practice, but I do have a couple of explanations that work on paper.
For instance, it looks like https://github.com/apache/spark/pull/7424, added in 1.5, introduces at least one extremely~rare corner-case path where Spark could double-close() a LineRecordReader instance in a way that triggers the bug. Here are the steps involved in the bad execution that I brainstormed up:
* [The task has finished reading input, so we call close()](https://github.com/apache/spark/blob/v1.5.1/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L168).
* [While handling the close call and trying to close the reader, reader.close() throws an exception]( https://github.com/apache/spark/blob/v1.5.1/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L190)
* We don't set `reader = null` after handling this exception, so the [TaskCompletionListener also ends up calling NewHadoopRDD.close()](https://github.com/apache/spark/blob/v1.5.1/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L156), which, in turn, closes the record reader again.
In this hypothetical situation, `LineRecordReader.close()` could [fail with an exception if its InputStream failed to close](https://github.com/apache/hadoop/blob/release-1.2.1/src/mapred/org/apache/hadoop/mapred/LineRecordReader.java#L212).
I googled for "Exception in RecordReader.close()" and it looks like it's possible for a closed Hadoop FileSystem to trigger an error there: [SPARK-757](https://issues.apache.org/jira/browse/SPARK-757), [SPARK-2491](https://issues.apache.org/jira/browse/SPARK-2491)
Looking at [SPARK-3052](https://issues.apache.org/jira/browse/SPARK-3052), it seems like it's possible to get spurious exceptions there when there is an error reading from Hadoop. If the Hadoop FileSystem were to get into an error state _right_ after reading the last record then it looks like we could hit the bug here in 1.5.
## The fix
This patch guards against these issues by modifying `HadoopRDD.close()` and `NewHadoopRDD.close()` so that they set `reader = null` even if an exception occurs in the `reader.close()` call. In addition, I modified `NextIterator. closeIfNeeded()` to guard against double-close if the first `close()` call throws an exception.
I don't have an easy way to test this, since I haven't been able to reproduce the bug that prompted this patch, but these changes seem safe and seem to rule out the on-paper reproductions that I was able to brainstorm up.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#9382 from JoshRosen/hadoop-decompressor-pooling-fix and squashes the following commits:
5ec97d7 [Josh Rosen] Add SqlNewHadoopRDD.unsetInputFileName() that I accidentally deleted.
ae46cf4 [Josh Rosen] Merge remote-tracking branch 'origin/master' into hadoop-decompressor-pooling-fix
087aa63 [Josh Rosen] Guard against double-close() of RecordReaders.
Since we do not need to preserve a page before calling compute(), MapPartitionsWithPreparationRDD is not needed anymore.
This PR basically revert #8543, #8511, #8038, #8011
Author: Davies Liu <davies@databricks.com>
Closes#9381 from davies/remove_prepare2.
See [SPARK-10986](https://issues.apache.org/jira/browse/SPARK-10986) for details.
This fixes the `ClassNotFoundException` for Spark classes in the serializer.
I am not sure this is the right way to handle the class loader, but I couldn't find any documentation on how the context class loader is used and who relies on it. It seems at least the serializer uses it to instantiate classes during deserialization.
I am open to suggestions (I tried this fix on a real Mesos cluster and it *does* fix the issue).
tnachen andrewor14
Author: Iulian Dragos <jaguarul@gmail.com>
Closes#9282 from dragos/issue/mesos-classloader.
This PR introduce a mechanism to call spill() on those SQL operators that support spilling (for example, BytesToBytesMap, UnsafeExternalSorter and ShuffleExternalSorter) if there is not enough memory for execution. The preserved first page is needed anymore, so removed.
Other Spillable objects in Spark core (ExternalSorter and AppendOnlyMap) are not included in this PR, but those could benefit from this (trigger others' spilling).
The PrepareRDD may be not needed anymore, could be removed in follow up PR.
The following script will fail with OOM before this PR, finished in 150 seconds with 2G heap (also works in 1.5 branch, with similar duration).
```python
sqlContext.setConf("spark.sql.shuffle.partitions", "1")
df = sqlContext.range(1<<25).selectExpr("id", "repeat(id, 2) as s")
df2 = df.select(df.id.alias('id2'), df.s.alias('s2'))
j = df.join(df2, df.id==df2.id2).groupBy(df.id).max("id", "id2")
j.explain()
print j.count()
```
For thread-safety, here what I'm got:
1) Without calling spill(), the operators should only be used by single thread, no safety problems.
2) spill() could be triggered in two cases, triggered by itself, or by other operators. we can check trigger == this in spill(), so it's still in the same thread, so safety problems.
3) if it's triggered by other operators (right now cache will not trigger spill()), we only spill the data into disk when it's in scanning stage (building is finished), so the in-memory sorter or memory pages are read-only, we only need to synchronize the iterator and change it.
4) During scanning, the iterator will only use one record in one page, we can't free this page, because the downstream is currently using it (used by UnsafeRow or other objects). In BytesToBytesMap, we just skip the current page, and dump all others into disk. In UnsafeExternalSorter, we keep the page that is used by current record (having the same baseObject), free it when loading the next record. In ShuffleExternalSorter, the spill() will not trigger during scanning.
5) In order to avoid deadlock, we didn't call acquireMemory during spill (so we reused the pointer array in InMemorySorter).
Author: Davies Liu <davies@databricks.com>
Closes#9241 from davies/force_spill.
Commit af3bc59d1f introduced new
functionality so that if an executor dies for a reason that's not
caused by one of the tasks running on the executor (e.g., due to
pre-emption), Spark doesn't count the failure towards the maximum
number of failures for the task. That commit introduced some vague
naming that this commit attempts to fix; in particular:
(1) The variable "isNormalExit", which was used to refer to cases where
the executor died for a reason unrelated to the tasks running on the
machine, has been renamed (and reversed) to "exitCausedByApp". The problem
with the existing name is that it's not clear (at least to me!) what it
means for an exit to be "normal"; the new name is intended to make the
purpose of this variable more clear.
(2) The variable "shouldEventuallyFailJob" has been renamed to
"countTowardsTaskFailures". This variable is used to determine whether
a task's failure should be counted towards the maximum number of failures
allowed for a task before the associated Stage is aborted. The problem
with the existing name is that it can be confused with implying that
the task's failure should immediately cause the stage to fail because it
is somehow fatal (this is the case for a fetch failure, for example: if
a task fails because of a fetch failure, there's no point in retrying,
and the whole stage should be failed).
Author: Kay Ousterhout <kayousterhout@gmail.com>
Closes#9164 from kayousterhout/SPARK-11178.
… ReceiverTracker and ReceiverSchedulingPolicy to use it
This PR includes the following changes:
1. Add a new preferred location format, `executor_<host>_<executorID>` (e.g., "executor_localhost_2"), to support specifying the executor locations for RDD.
2. Use the new preferred location format in `ReceiverTracker` to optimize the starting time of Receivers when there are multiple executors in a host.
The goal of this PR is to enable the streaming scheduler to place receivers (which run as tasks) in specific executors. Basically, I want to have more control on the placement of the receivers such that they are evenly distributed among the executors. We tried to do this without changing the core scheduling logic. But it does not allow specifying particular executor as preferred location, only at the host level. So if there are two executors in the same host, and I want two receivers to run on them (one on each executor), I cannot specify that. Current code only specifies the host as preference, which may end up launching both receivers on the same executor. We try to work around it but restarting a receiver when it does not launch in the desired executor and hope that next time it will be started in the right one. But that cause lots of restarts, and delays in correctly launching the receiver.
So this change, would allow the streaming scheduler to specify the exact executor as the preferred location. Also this is not exposed to the user, only the streaming scheduler uses this.
Author: zsxwing <zsxwing@gmail.com>
Closes#9181 from zsxwing/executor-location.
This commit fixes a bug where, in Standalone mode, if a task fails and crashes the JVM, the
failure is considered a "normal failure" (meaning it's considered unrelated to the task), so
the failure isn't counted against the task's maximum number of failures:
af3bc59d1f (diff-a755f3d892ff2506a7aa7db52022d77cL138).
As a result, if a task fails in a way that results in it crashing the JVM, it will continuously be
re-launched, resulting in a hang. This commit fixes that problem.
This bug was introduced by #8007; andrewor14 mccheah vanzin can you take a look at this?
This error is hard to trigger because we handle executor losses through 2 code paths (the second is via Akka, where Akka notices that the executor endpoint is disconnected). In my setup, the Akka code path completes first, and doesn't have this bug, so things work fine (see my recent email to the dev list about this). If I manually disable the Akka code path, I can see the hang (and this commit fixes the issue).
Author: Kay Ousterhout <kayousterhout@gmail.com>
Closes#9273 from kayousterhout/SPARK-11306.
The SizeEstimator keeps a cache of ClassInfos but this cache uses Class objects as keys.
Which results in strong references to the Class objects. If these classes are dynamically created
this prevents the corresponding ClassLoader from being GCed. Leading to PermGen exhaustion.
We use a Map with WeakKeys to prevent this issue.
Author: Sem Mulder <sem.mulder@site2mobile.com>
Closes#9244 from SemMulder/fix-sizeestimator-classunloading.
This patch refactors the MemoryManager class structure. After #9000, Spark had the following classes:
- MemoryManager
- StaticMemoryManager
- ExecutorMemoryManager
- TaskMemoryManager
- ShuffleMemoryManager
This is fairly confusing. To simplify things, this patch consolidates several of these classes:
- ShuffleMemoryManager and ExecutorMemoryManager were merged into MemoryManager.
- TaskMemoryManager is moved into Spark Core.
**Key changes and tasks**:
- [x] Merge ExecutorMemoryManager into MemoryManager.
- [x] Move pooling logic into Allocator.
- [x] Move TaskMemoryManager from `spark-unsafe` to `spark-core`.
- [x] Refactor the existing Tungsten TaskMemoryManager interactions so Tungsten code use only this and not both this and ShuffleMemoryManager.
- [x] Refactor non-Tungsten code to use the TaskMemoryManager instead of ShuffleMemoryManager.
- [x] Merge ShuffleMemoryManager into MemoryManager.
- [x] Move code
- [x] ~~Simplify 1/n calculation.~~ **Will defer to followup, since this needs more work.**
- [x] Port ShuffleMemoryManagerSuite tests.
- [x] Move classes from `unsafe` package to `memory` package.
- [ ] Figure out how to handle the hacky use of the memory managers in HashedRelation's broadcast variable construction.
- [x] Test porting and cleanup: several tests relied on mock functionality (such as `TestShuffleMemoryManager.markAsOutOfMemory`) which has been changed or broken during the memory manager consolidation
- [x] AbstractBytesToBytesMapSuite
- [x] UnsafeExternalSorterSuite
- [x] UnsafeFixedWidthAggregationMapSuite
- [x] UnsafeKVExternalSorterSuite
**Compatiblity notes**:
- This patch introduces breaking changes in `ExternalAppendOnlyMap`, which is marked as `DevloperAPI` (likely for legacy reasons): this class now cannot be used outside of a task.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#9127 from JoshRosen/SPARK-10984.
Executing deploy.client.TestClient fails due to bad class name for TestExecutor in ApplicationDescription.
Author: Bryan Cutler <bjcutler@us.ibm.com>
Closes#9255 from BryanCutler/fix-TestClient-classname-SPARK-11287.
Two typos squashed.
BTW Let me know how to proceed with other typos if I ran across any. I don't feel well to leave them aside as much as sending pull requests with such tiny changes. Guide me.
Author: Jacek Laskowski <jacek.laskowski@deepsense.io>
Closes#9250 from jaceklaskowski/typos-hunting.
…ut building with -Phive-thriftserver and SPARK_PREPEND_CLASSES is set
This is the exception after this patch. Please help review.
```
java.lang.NoClassDefFoundError: org/apache/hadoop/hive/cli/CliDriver
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:412)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:270)
at org.apache.spark.util.Utils$.classForName(Utils.scala:173)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:647)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hive.cli.CliDriver
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 21 more
Failed to load hive class.
You need to build Spark with -Phive and -Phive-thriftserver.
```
Author: Jeff Zhang <zjffdu@apache.org>
Closes#9134 from zjffdu/SPARK-11125.
This test can take a little while to finish on slow / loaded machines.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#9235 from vanzin/SPARK-11134.
The current NettyRpc has a message order issue because it uses a thread pool to send messages. E.g., running the following two lines in the same thread,
```
ref.send("A")
ref.send("B")
```
The remote endpoint may see "B" before "A" because sending "A" and "B" are in parallel.
To resolve this issue, this PR added an outbox for each connection, and if we are connecting to the remote node when sending messages, just cache the sending messages in the outbox and send them one by one when the connection is established.
Author: zsxwing <zsxwing@gmail.com>
Closes#9197 from zsxwing/rpc-outbox.
```
// My machine only has 8 cores
$ bin/spark-shell --master local[32]
scala> val df = sc.parallelize(Seq((1, 1), (2, 2))).toDF("a", "b")
scala> df.as("x").join(df.as("y"), $"x.a" === $"y.a").count()
Caused by: java.io.IOException: Unable to acquire 2097152 bytes of memory
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:351)
```
Author: Andrew Or <andrew@databricks.com>
Closes#9209 from andrewor14/fix-local-page-size.
This commit removes unnecessary calls to addPendingTask in
TaskSetManager.executorLost. These calls are unnecessary: for
tasks that are still pending and haven't been launched, they're
still in all of the correct pending lists, so calling addPendingTask
has no effect. For tasks that are currently running (which may still be
in the pending lists, depending on how they were scheduled), we call
addPendingTask in handleFailedTask, so the calls at the beginning
of executorLost are redundant.
I think these calls are left over from when we re-computed the locality
levels in addPendingTask; now that we call recomputeLocality separately,
I don't think these are necessary.
Now that those calls are removed, the readding parameter in addPendingTask
is no longer necessary, so this commit also removes that parameter.
markhamstra can you take a look at this?
cc vanzin
Author: Kay Ousterhout <kayousterhout@gmail.com>
Closes#9154 from kayousterhout/SPARK-11163.
The current `NettyRpcEndpointRef.send` can be interrupted because it uses `LinkedBlockingQueue.put`, which may hang the application.
Image the following execution order:
| thread 1: TaskRunner.kill | thread 2: TaskRunner.run
------------- | ------------- | -------------
1 | killed = true |
2 | | if (killed) {
3 | | throw new TaskKilledException
4 | | case _: TaskKilledException _: InterruptedException if task.killed =>
5 | task.kill(interruptThread): interruptThread is true |
6 | | execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))
7 | | localEndpoint.send(StatusUpdate(taskId, state, serializedData)): in LocalBackend
Then `localEndpoint.send(StatusUpdate(taskId, state, serializedData))` will throw `InterruptedException`. This will prevent the executor from updating the task status and hang the application.
An failure caused by the above issue here: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44062/consoleFull
Since `receivers` is an unbounded `LinkedBlockingQueue`, we can just use `LinkedBlockingQueue.offer` to resolve this issue.
Author: zsxwing <zsxwing@gmail.com>
Closes#9198 from zsxwing/dont-interrupt-send.
There's a lot of duplication between SortShuffleManager and UnsafeShuffleManager. Given that these now provide the same set of functionality, now that UnsafeShuffleManager supports large records, I think that we should replace SortShuffleManager's serialized shuffle implementation with UnsafeShuffleManager's and should merge the two managers together.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#8829 from JoshRosen/consolidate-sort-shuffle-implementations.
Correct the logic to return `HDFSCacheTaskLocation` instance when the input `str` is a in memory location.
Author: zhichao.li <zhichao.li@intel.com>
Closes#9096 from zhichao-li/uselessBranch.
I was looking at this code and found the documentation to be insufficient. I added more documentation, and refactored some relevant code path slightly to improve encapsulation. There are more that I want to do, but I want to get these changes in before doing more work.
My goal is to reduce exposing internal fields directly in ShuffleMapStage to improve encapsulation. After this change, DAGScheduler no longer directly writes outputLocs. There are still 3 places that reads outputLocs directly, but we can change those later.
Author: Reynold Xin <rxin@databricks.com>
Closes#9175 from rxin/stage-cleanup.
`transient` annotations on class parameters (not case class parameters or vals) causes compilation errors during compilation with Scala 2.11.
I understand that transient *parameters* make no sense, however I don't quite understand why the 2.10 compiler accepted them.
Note: in case it is preferred to keep the annotations in case someone would in the future want to redefine them as vals, it would also be possible to just add `val` after the annotation, e.g. `class Foo(transient x: Int)` becomes `class Foo(transient private val x: Int)`.
I chose to remove the annotation as it also reduces needles clutter, however please feel free to tell me if you prefer the second option and I'll update the PR
Author: Jakob Odersky <jodersky@gmail.com>
Closes#9126 from jodersky/sbt-scala-2.11.
I also added some information to container-failure error msgs about what host they failed on, which would have helped me identify the problem that lead me to this JIRA and PR sooner.
Author: Ryan Williams <ryan.blake.williams@gmail.com>
Closes#9147 from ryan-williams/dyn-exec-failures.
This is my own original work and I license this to the project under the project's open source license
Author: Chris Bannister <chris.bannister@swiftkey.com>
Author: Chris Bannister <chris.bannister@swiftkey.net>
Closes#8358 from Zariel/mesos-local-dir.
JIRA: https://issues.apache.org/jira/browse/SPARK-11051
When a `RDD` is materialized and checkpointed, its partitions and dependencies are cleared. If we allow local checkpointing on it and assign `LocalRDDCheckpointData` to its `checkpointData`. Next time when the RDD is materialized again, the error will be thrown.
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes#9072 from viirya/no-localcheckpoint-after-checkpoint.
Because the registration RPC was not really an RPC, but a bunch of
disconnected messages, it was possible for other messages to be
sent before the reply to the registration arrived, and that would
confuse the Worker. Especially in local-cluster mode, the worker was
succeptible to receiving an executor request before it received a
message from the master saying registration succeeded.
On top of the above, the change also fixes a ClassCastException when
the registration fails, which also affects the executor registration
protocol. Because the `ask` is issued with a specific return type,
if the error message (of a different type) was returned instead, the
code would just die with an exception. This is fixed by having a common
base trait for these reply messages.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#9138 from vanzin/SPARK-11131.
Mesos has a feature for linking to frameworks running on top of Mesos
from the Mesos WebUI. This commit enables Spark to make use of this
feature so one can directly visit the running Spark WebUIs from the
Mesos WebUI.
Author: ph <ph@plista.com>
Closes#9135 from philipphoffmann/SPARK-11129.
Its classdoc actually says; "NOTE: DO NOT USE this class outside of Spark. It is intended as an internal utility."
Author: Reynold Xin <rxin@databricks.com>
Closes#9155 from rxin/private-logging-trait.
Switched from deprecated org.apache.hadoop.fs.permission.AccessControlException to org.apache.hadoop.security.AccessControlException.
Author: gweidner <gweidner@us.ibm.com>
Closes#9144 from gweidner/SPARK-11109.
Some json parsers are not closed. parser in JacksonParser#parseJson, for example.
Author: navis.ryu <navis@apache.org>
Closes#9130 from navis/SPARK-11124.
#9084 uncovered that many tests that test spilling don't actually spill. This is a follow-up patch to fix that to ensure our unit tests actually catch potential bugs in spilling. The size of this patch is inflated by the refactoring of `ExternalSorterSuite`, which had a lot of duplicate code and logic.
Author: Andrew Or <andrew@databricks.com>
Closes#9124 from andrewor14/spilling-tests.
If the heartbeat receiver kills executors (and new ones are not registered to replace them), the idle timeout for the old executors will be lost (and then change a total number of executors requested by Driver), So new ones will be not to asked to replace them.
For example, executorsPendingToRemove=Set(1), and executor 2 is idle timeout before a new executor is asked to replace executor 1. Then driver kill executor 2, and sending RequestExecutors to AM. But executorsPendingToRemove=Set(1,2), So AM doesn't allocate a executor to replace 1.
see: https://github.com/apache/spark/pull/8668
Author: KaiXinXiaoLei <huleilei1@huawei.com>
Author: huleilei <huleilei1@huawei.com>
Closes#8945 from KaiXinXiaoLei/pendingexecutor.
Internal accumulators don't write the internal flag to event log. So on the history server Web UI, all accumulators are not internal. This causes incorrect peak execution memory and unwanted accumulator table displayed on the stage page.
To fix it, I add the "internal" property of AccumulableInfo when writing the event log.
Author: Carson Wang <carson.wang@intel.com>
Closes#9061 from carsonwang/accumulableBug.
Restrict tasks (of job) to only 1 to ensure that the causing Exception asserted for job failure is the deliberately thrown DAGSchedulerSuiteDummyException intended, not an UnsupportedOperationException from any second/subsequent tasks that can propagate from a race condition during code execution.
Author: shellberg <sah@zepler.org>
Closes#9076 from shellberg/shellberg-DAGSchedulerSuite-misbehavedResultHandlerTest-patch-1.
A few more changes:
1. Renamed IDVerifier -> RpcEndpointVerifier
2. Renamed NettyRpcAddress -> RpcEndpointAddress
3. Simplified NettyRpcHandler a bit by removing the connection count tracking. This is OK because I now force spark.shuffle.io.numConnectionsPerPeer to 1
4. Reduced spark.rpc.connect.threads to 64. It would be great to eventually remove this extra thread pool.
5. Minor cleanup & documentation.
Author: Reynold Xin <rxin@databricks.com>
Closes#9112 from rxin/SPARK-11096.
should pick into spark 1.5.2 also.
https://issues.apache.org/jira/browse/SPARK-10619
looks like this was broken by commit: fb1d06fc24 (diff-b8adb646ef90f616c34eb5c98d1ebd16)
It looks like somethings were change to use the UIUtils.listingTable but executor page wasn't converted so when it removed sortable from the UIUtils. TABLE_CLASS_NOT_STRIPED it broke this page.
Simply add the sortable tag back in and it fixes both active UI and the history server UI.
Author: Tom Graves <tgraves@yahoo-inc.com>
Closes#9101 from tgravescs/SPARK-10619.
This patch unifies the memory management of the storage and execution regions such that either side can borrow memory from each other. When memory pressure arises, storage will be evicted in favor of execution. To avoid regressions in cases where storage is crucial, we dynamically allocate a fraction of space for storage that execution cannot evict. Several configurations are introduced:
- **spark.memory.fraction (default 0.75)**: fraction of the heap space used for execution and storage. The lower this is, the more frequently spills and cached data eviction occur. The purpose of this config is to set aside memory for internal metadata, user data structures, and imprecise size estimation in the case of sparse, unusually large records.
- **spark.memory.storageFraction (default 0.5)**: size of the storage region within the space set aside by `spark.memory.fraction`. Cached data may only be evicted if total storage exceeds this region.
- **spark.memory.useLegacyMode (default false)**: whether to use the memory management that existed in Spark 1.5 and before. This is mainly for backward compatibility.
For a detailed description of the design, see [SPARK-10000](https://issues.apache.org/jira/browse/SPARK-10000). This patch builds on top of the `MemoryManager` interface introduced in #9000.
Author: Andrew Or <andrew@databricks.com>
Closes#9084 from andrewor14/unified-memory-manager.
Two points in this PR:
1. Originally thought was that a named R list is assumed to be a struct in SerDe. But this is problematic because some R functions will implicitly generate named lists that are not intended to be a struct when transferred by SerDe. So SerDe clients have to explicitly mark a names list as struct by changing its class from "list" to "struct".
2. SerDe is in the Spark Core module, and data of StructType is represented as GenricRow which is defined in Spark SQL module. SerDe can't import GenricRow as in maven build Spark SQL module depends on Spark Core module. So this PR adds a registration hook in SerDe to allow SQLUtils in Spark SQL module to register its functions for serialization and deserialization of StructType.
Author: Sun Rui <rui.sun@intel.com>
Closes#8794 from sun-rui/SPARK-10051.
I'm going through the implementation right now for post-doc review. Adding more comments and renaming things as I go through them.
I also want to write higher level documentation about how the whole thing works -- but those will come in other pull requests.
Author: Reynold Xin <rxin@databricks.com>
Closes#9091 from rxin/rpc-review.
https://issues.apache.org/jira/browse/SPARK-10858
The issue here is that in resolveURI we default to calling new File(path).getAbsoluteFile().toURI(). But if the path passed in already has a # in it then File(path) will think that is supposed to be part of the actual file path and not a fragment so it changes # to %23. Then when we try to parse that later in Client as a URI it doesn't recognize there is a fragment.
so to fix we just check if there is a fragment, still create the File like we did before and then add the fragment back on.
Author: Tom Graves <tgraves@yahoo-inc.com>
Closes#9035 from tgravescs/SPARK-10858.
This change adds an API that encapsulates information about an app
launched using the library. It also creates a socket-based communication
layer for apps that are launched as child processes; the launching
application listens for connections from launched apps, and once
communication is established, the channel can be used to send updates
to the launching app, or to send commands to the child app.
The change also includes hooks for local, standalone/client and yarn
masters.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#7052 from vanzin/SPARK-8673.
This patch introduces a `MemoryManager` that is the central arbiter of how much memory to grant to storage and execution. This patch is primarily concerned only with refactoring while preserving the existing behavior as much as possible.
This is the first step away from the existing rigid separation of storage and execution memory, which has several major drawbacks discussed on the [issue](https://issues.apache.org/jira/browse/SPARK-10956). It is the precursor of a series of patches that will attempt to address those drawbacks.
Author: Andrew Or <andrew@databricks.com>
Author: Josh Rosen <joshrosen@databricks.com>
Author: andrewor14 <andrew@databricks.com>
Closes#9000 from andrewor14/memory-manager.
In YARN client mode, when the AM connects to the driver, it may be the case
that the driver never needs to send a message back to the AM (i.e., no
dynamic allocation or preemption). This triggers an issue in the netty rpc
backend where no disconnection event is sent to endpoints, and the AM never
exits after the driver shuts down.
The real fix is too complicated, so this is a quick hack to unblock YARN
client mode until we can work on the real fix. It forces the driver to
send a message to the AM when the AM registers, thus establishing that
connection and enabling the disconnection event when the driver goes
away.
Also, a minor side issue: when the executor is shutting down, it needs
to send an "ack" back to the driver when using the netty rpc backend; but
that "ack" wasn't being sent because the handler was shutting down the rpc
env before returning. So added a change to delay the shutdown a little bit,
allowing the ack to be sent back.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#9021 from vanzin/SPARK-10987.
The `self` method returns null when called from the constructor;
instead, registration should happen in the `onStart` method, at
which point the `self` reference has already been initialized.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#9005 from vanzin/SPARK-10964.
This makes YARN containers behave like all other processes launched by
Spark, which launch with a default perm gen size of 256m unless
overridden by the user (or not needed by the vm).
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#8970 from vanzin/SPARK-10916.
This PR just reverted 02144d6745 to remerge #6457 and also included the commits in #8905.
Author: zsxwing <zsxwing@gmail.com>
Closes#8944 from zsxwing/SPARK-6028.
Compatibility between history server script and functionality
The history server has its argument parsing class in HistoryServerArguments. However, this doesn't get involved in the start-history-server.sh codepath where the $0 arg is assigned to spark.history.fs.logDirectory and all other arguments discarded (e.g --property-file.)
This stops the other options being usable from this script
Author: Joshi <rekhajoshm@gmail.com>
Author: Rekha Joshi <rekhajoshm@gmail.com>
Closes#8758 from rekhajoshm/SPARK-10317.
The utilities such as Substring#substringBinarySQL and BinaryPrefixComparator#computePrefix for binary data are put together in ByteArray for easy-to-read.
Author: Takeshi YAMAMURO <linguin.m.s@gmail.com>
Closes#8122 from maropu/CleanUpForBinaryType.
The YARN backend doesn't like when user code calls System.exit, since it cannot know the exit status and thus cannot set an appropriate final status for the application.
This PR remove the usage of system.exit to exit the RRunner. Instead, when the R process running an SparkR script returns an exit code other than 0, throws SparkUserAppException which will be caught by ApplicationMaster and ApplicationMaster knows it failed. For other failures, throws SparkException.
Author: Sun Rui <rui.sun@intel.com>
Closes#8938 from sun-rui/SPARK-10851.
Fix the following issues in StandaloneDynamicAllocationSuite:
1. It should not assume master and workers start in order
2. It should not assume master and workers get ready at once
3. It should not assume the application is already registered with master after creating SparkContext
4. It should not access Master.app and idToApp which are not thread safe
The changes includes:
* Use `eventually` to wait until master and workers are ready to fix 1 and 2
* Use `eventually` to wait until the application is registered with master to fix 3
* Use `askWithRetry[MasterStateResponse](RequestMasterState)` to get the application info to fix 4
Author: zsxwing <zsxwing@gmail.com>
Closes#8914 from zsxwing/fix-StandaloneDynamicAllocationSuite.
In the course of https://issues.apache.org/jira/browse/LEGAL-226 it came to light that the guidance at http://www.apache.org/dev/licensing-howto.html#permissive-deps means that permissively-licensed dependencies has a different interpretation than we (er, I) had been operating under. "pointer ... to the license within the source tree" specifically means a copy of the license within Spark's distribution, whereas at the moment, Spark's LICENSE has a pointer to the project's license in the other project's source tree.
The remedy is simply to inline all such license references (i.e. BSD/MIT licenses) or include their text in "licenses" subdirectory and point to that.
Along the way, we can also treat other BSD/MIT licenses, whose text has been inlined into LICENSE, in the same way.
The LICENSE file can continue to provide a helpful list of BSD/MIT licensed projects and a pointer to their sites. This would be over and above including license text in the distro, which is the essential thing.
Author: Sean Owen <sowen@cloudera.com>
Closes#8919 from srowen/SPARK-10833.
While this is likely not a huge issue for real production systems, for test systems which may setup a Spark Context and tear it down and stand up a Spark Context with a different master (e.g. some local mode & some yarn mode) tests this cane be an issue. Discovered during work on spark-testing-base on Spark 1.4.1, but seems like the logic that triggers it is present in master (see SparkHadoopUtil object). A valid work around for users encountering this issue is to fork a different JVM, however this can be heavy weight.
```
[info] SampleMiniClusterTest:
[info] Exception encountered when attempting to run a suite with class name: com.holdenkarau.spark.testing.SampleMiniClusterTest *** ABORTED ***
[info] java.lang.ClassCastException: org.apache.spark.deploy.SparkHadoopUtil cannot be cast to org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
[info] at org.apache.spark.deploy.yarn.YarnSparkHadoopUtil$.get(YarnSparkHadoopUtil.scala:163)
[info] at org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:257)
[info] at org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:561)
[info] at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:115)
[info] at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57)
[info] at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141)
[info] at org.apache.spark.SparkContext.<init>(SparkContext.scala:497)
[info] at com.holdenkarau.spark.testing.SharedMiniCluster$class.setup(SharedMiniCluster.scala:186)
[info] at com.holdenkarau.spark.testing.SampleMiniClusterTest.setup(SampleMiniClusterTest.scala:26)
[info] at com.holdenkarau.spark.testing.SharedMiniCluster$class.beforeAll(SharedMiniCluster.scala:103)
```
Author: Holden Karau <holden@pigscanfly.ca>
Closes#8911 from holdenk/SPARK-10812-spark-hadoop-util-support-switching-to-yarn.
This makes two changes:
- Allow reduce tasks to fetch multiple map output partitions -- this is a pretty small change to HashShuffleFetcher
- Move shuffle locality computation out of DAGScheduler and into ShuffledRDD / MapOutputTracker; this was needed because the code in DAGScheduler wouldn't work for RDDs that fetch multiple map output partitions from each reduce task
I also added an AdaptiveSchedulingSuite that creates RDDs depending on multiple map output partitions.
Author: Matei Zaharia <matei@databricks.com>
Closes#8844 from mateiz/spark-9852.
The DiskBlockObjectWriter constructor took a BlockId parameter but never used it. As part of some general cleanup in these interfaces, this patch refactors its constructor to eliminate this parameter.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#8871 from JoshRosen/disk-block-object-writer-blockid-cleanup.
This patch reverts most of the changes in a previous fix#8827.
The real cause of the issue is that in `TungstenAggregate`'s prepare method we only reserve 1 page, but later when we switch to sort-based aggregation we try to acquire 1 page AND a pointer array. The longer-term fix should be to reserve also the pointer array, but for now ***we will simply not track the pointer array***. (Note that elsewhere we already don't track the pointer array, e.g. [here](a18208047f/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java (L88)))
Note: This patch reuses the unit test added in #8827 so it doesn't show up in the diff.
Author: Andrew Or <andrew@databricks.com>
Closes#8888 from andrewor14/dont-track-pointer-array.
Python DataFrame.head/take now requires scanning all the partitions. This pull request changes them to delegate the actual implementation to Scala DataFrame (by calling DataFrame.take).
This is more of a hack for fixing this issue in 1.5.1. A more proper fix is to change executeCollect and executeTake to return InternalRow rather than Row, and thus eliminate the extra round-trip conversion.
Author: Reynold Xin <rxin@databricks.com>
Closes#8876 from rxin/SPARK-10731.
This patch refactors Python UDF handling:
1. Extract the per-partition Python UDF calling logic from PythonRDD into a PythonRunner. PythonRunner itself expects iterator as input/output, and thus has no dependency on RDD. This way, we can use PythonRunner directly in a mapPartitions call, or in the future in an environment without RDDs.
2. Use PythonRunner in Spark SQL's BatchPythonEvaluation.
3. Updated BatchPythonEvaluation to only use its input once, rather than twice. This should fix Python UDF performance regression in Spark 1.5.
There are a number of small cleanups I wanted to do when I looked at the code, but I kept most of those out so the diff looks small.
This basically implements the approach in https://github.com/apache/spark/pull/8833, but with some code moving around so the correctness doesn't depend on the inner workings of Spark serialization and task execution.
Author: Reynold Xin <rxin@databricks.com>
Closes#8835 from rxin/python-iter-refactor.
The current shuffle code has an interface named ShuffleReader with only one implementation, HashShuffleReader. This naming is confusing, since the same read path code is used for both sort- and hash-based shuffle. This patch addresses this by renaming HashShuffleReader to BlockStoreShuffleReader.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#8825 from JoshRosen/shuffle-reader-cleanup.
If we cache the InputFormat, all tasks on the same executor will share it.
Some InputFormat is thread safety, but some are not, such as HiveHBaseTableInputFormat. If tasks share a non thread safe InputFormat, unexpected error may be occurs.
To avoid it, I think we should delete the input format caching.
Author: xutingjun <xutingjun@huawei.com>
Author: meiyoula <1039320815@qq.com>
Author: Xutingjun <xutingjun@huawei.com>
Closes#7918 from XuTingjun/cached_inputFormat.
In ```RUtils.sparkRPackagePath()``` we
1. Call ``` sys.props("spark.submit.deployMode")``` which returns null if ```spark.submit.deployMode``` is not suet
2. Call ``` sparkConf.get("spark.submit.deployMode")``` which throws ```NoSuchElementException``` if ```spark.submit.deployMode``` is not set. This patch simply passes a default value ("cluster") for ```spark.submit.deployMode```.
cc rxin
Author: Hossein <hossein@databricks.com>
Closes#8832 from falaki/SPARK-10711.
The job group, and job descriptions information is passed through thread local properties, and get inherited by child threads. In case of spark streaming, the streaming jobs inherit these properties from the thread that called streamingContext.start(). This may not make sense.
1. Job group: This is mainly used for cancelling a group of jobs together. It does not make sense to cancel streaming jobs like this, as the effect will be unpredictable. And its not a valid usecase any way, to cancel a streaming context, call streamingContext.stop()
2. Job description: This is used to pass on nice text descriptions for jobs to show up in the UI. The job description of the thread that calls streamingContext.start() is not useful for all the streaming jobs, as it does not make sense for all of the streaming jobs to have the same description, and the description may or may not be related to streaming.
The solution in this PR is meant for the Spark master branch, where local properties are inherited by cloning the properties. The job group and job description in the thread that starts the streaming scheduler are explicitly removed, so that all the subsequent child threads does not inherit them. Also, the starting is done in a new child thread, so that setting the job group and description for streaming, does not change those properties in the thread that called streamingContext.start().
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#8781 from tdas/SPARK-10649.
Track pending tasks by partition ID instead of Task objects.
Before this change, failure & retry could result in a case where a stage got submitted before the map output from its dependencies get registered. This was due to an error in the condition for registering map outputs.
Author: hushan[胡珊] <hushan@xiaomi.com>
Author: Imran Rashid <irashid@cloudera.com>
Closes#7699 from squito/SPARK-5259.
I noticed only one block manager registered with master in an unsuccessful build (https://amplab.cs.berkeley.edu/jenkins/job/Spark-Master-SBT/AMPLAB_JENKINS_BUILD_PROFILE=hadoop2.2,label=spark-test/3534/)
```
15/09/16 13:02:30.981 pool-1-thread-1-ScalaTest-running-BroadcastSuite INFO SparkContext: Running Spark version 1.6.0-SNAPSHOT
...
15/09/16 13:02:38.133 sparkDriver-akka.actor.default-dispatcher-19 INFO BlockManagerMasterEndpoint: Registering block manager localhost:48196 with 530.3 MB RAM, BlockManagerId(0, localhost, 48196)
```
In addition, the first block manager needed 7+ seconds to start. But the test expected 2 block managers so it failed.
However, there was no exception in this log file. So I checked a successful build (https://amplab.cs.berkeley.edu/jenkins/job/Spark-Master-SBT/3536/AMPLAB_JENKINS_BUILD_PROFILE=hadoop2.2,label=spark-test/) and it needed 4-5 seconds to set up the local cluster:
```
15/09/16 18:11:27.738 sparkWorker1-akka.actor.default-dispatcher-5 INFO Worker: Running Spark version 1.6.0-SNAPSHOT
...
15/09/16 18:11:30.838 sparkDriver-akka.actor.default-dispatcher-20 INFO BlockManagerMasterEndpoint: Registering block manager localhost:54202 with 530.3 MB RAM, BlockManagerId(1, localhost, 54202)
15/09/16 18:11:32.112 sparkDriver-akka.actor.default-dispatcher-20 INFO BlockManagerMasterEndpoint: Registering block manager localhost:32955 with 530.3 MB RAM, BlockManagerId(0, localhost, 32955)
```
In this build, the first block manager needed only 3+ seconds to start.
Comparing these two builds, I guess it's possible that the local cluster in `BroadcastSuite` cannot be ready in 10 seconds if the Jenkins worker is busy. So I just increased the timeout to 60 seconds to see if this can fix the issue.
Author: zsxwing <zsxwing@gmail.com>
Closes#8813 from zsxwing/fix-BroadcastSuite.
It does not make much sense to set `spark.shuffle.spill` or `spark.sql.planner.externalSort` to false: I believe that these configurations were initially added as "escape hatches" to guard against bugs in the external operators, but these operators are now mature and well-tested. In addition, these configurations are not handled in a consistent way anymore: SQL's Tungsten codepath ignores these configurations and will continue to use spilling operators. Similarly, Spark Core's `tungsten-sort` shuffle manager does not respect `spark.shuffle.spill=false`.
This pull request removes these configurations, adds warnings at the appropriate places, and deletes a large amount of code which was only used in code paths that did not support spilling.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#8831 from JoshRosen/remove-ability-to-disable-spilling.
When `TungstenAggregation` hits memory pressure, it switches from hash-based to sort-based aggregation in-place. However, in the process we try to allocate the pointer array for writing to the new `UnsafeExternalSorter` *before* actually freeing the memory from the hash map. This lead to the following exception:
```
java.io.IOException: Could not acquire 65536 bytes of memory
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.initializeForWriting(UnsafeExternalSorter.java:169)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:220)
at org.apache.spark.sql.execution.UnsafeKVExternalSorter.<init>(UnsafeKVExternalSorter.java:126)
at org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.destructAndCreateExternalSorter(UnsafeFixedWidthAggregationMap.java:257)
at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.switchToSortBasedAggregation(TungstenAggregationIterator.scala:435)
```
Author: Andrew Or <andrew@databricks.com>
Closes#8827 from andrewor14/allocate-pointer-array.
This patch attempts to fix the Hadoop Configuration thread safety issue for NewHadoopRDD in the same way SPARK-2546 fixed the issue for HadoopRDD.
Author: Mingyu Kim <mkim@palantir.com>
Closes#8763 from mingyukim/mkim/SPARK-10611.
When we start HiveThriftServer, we will start SparkContext first, then start HiveServer2, if we kill application while HiveServer2 is starting then SparkContext will stop successfully, but SparkSubmit process can not exit.
Author: linweizhong <linweizhong@huawei.com>
Closes#7853 from Sephiroth-Lin/SPARK-9522.
This pull request is to address the JIRA SPARK-10172 (History Server web UI gets messed up when sorting on any column).
The content of the table gets messed up due to the rowspan attribute of the table data(cell) during sorting.
The current table sort library used in SparkUI (sorttable.js) doesn't support/handle cells(td) with rowspans.
The fix will disable the table sort in the web UI, when there are jobs listed with multiple attempts.
Author: Josiah Samuel <josiah_sams@in.ibm.com>
Closes#8506 from josiahsams/SPARK-10172.
1. Support collecting data of MapType from DataFrame.
2. Support data of MapType in createDataFrame.
Author: Sun Rui <rui.sun@intel.com>
Closes#8711 from sun-rui/SPARK-10050.
Set `X-Frame-Options: SAMEORIGIN` to protect against frame-related vulnerability
Author: Sean Owen <sowen@cloudera.com>
Closes#8745 from srowen/SPARK-10589.
When speculative execution is enabled, consider a scenario where the authorized committer of a particular output partition fails during the OutputCommitter.commitTask() call. In this case, the OutputCommitCoordinator is supposed to release that committer's exclusive lock on committing once that task fails. However, due to a unit mismatch (we used task attempt number in one place and task attempt id in another) the lock will not be released, causing Spark to go into an infinite retry loop.
This bug was masked by the fact that the OutputCommitCoordinator does not have enough end-to-end tests (the current tests use many mocks). Other factors contributing to this bug are the fact that we have many similarly-named identifiers that have different semantics but the same data types (e.g. attemptNumber and taskAttemptId, with inconsistent variable naming which makes them difficult to distinguish).
This patch adds a regression test and fixes this bug by always using task attempt numbers throughout this code.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#8544 from JoshRosen/SPARK-10381.
Remove return statements in RDD.takeSample and wrap it withScope
Author: vinodkc <vinod.kc.in@gmail.com>
Author: vinodkc <vinodkc@users.noreply.github.com>
Author: Vinod K C <vinod.kc@huawei.com>
Closes#8730 from vinodkc/fix_takesample_return.
*Note: this is for master branch only.* The fix for branch-1.5 is at #8721.
The query execution ID is currently passed from a thread to its children, which is not the intended behavior. This led to `IllegalArgumentException: spark.sql.execution.id is already set` when running queries in parallel, e.g.:
```
(1 to 100).par.foreach { _ =>
sc.parallelize(1 to 5).map { i => (i, i) }.toDF("a", "b").count()
}
```
The cause is `SparkContext`'s local properties are inherited by default. This patch adds a way to exclude keys we don't want to be inherited, and makes SQL go through that code path.
Author: Andrew Or <andrew@databricks.com>
Closes#8710 from andrewor14/concurrent-sql-executions.
This patch adds support for submitting map stages in a DAG individually so that we can make downstream decisions after seeing statistics about their output, as part of SPARK-9850. I also added more comments to many of the key classes in DAGScheduler. By itself, the patch is not super useful except maybe to switch between a shuffle and broadcast join, but with the other subtasks of SPARK-9850 we'll be able to do more interesting decisions.
The main entry point is SparkContext.submitMapStage, which lets you run a map stage and see stats about the map output sizes. Other stats could also be collected through accumulators. See AdaptiveSchedulingSuite for a short example.
Author: Matei Zaharia <matei@databricks.com>
Closes#8180 from mateiz/spark-9851.
This is a follow-up patch to #8723. I missed one case there.
Author: Andrew Or <andrew@databricks.com>
Closes#8727 from andrewor14/fix-threading-suite.
Move .java files in `src/main/scala` to `src/main/java` root, except for `package-info.java` (to stay next to package.scala)
Author: Sean Owen <sowen@cloudera.com>
Closes#8736 from srowen/SPARK-10576.
This is a follow-up of https://github.com/apache/spark/pull/8317.
When speculation is enabled, there may be multiply tasks writing to the same path. Generally it's OK as we will write to a temporary directory first and only one task can commit the temporary directory to target path.
However, when we use direct output committer, tasks will write data to target path directly without temporary directory. This causes problems like corrupted data. Please see [PR comment](https://github.com/apache/spark/pull/8191#issuecomment-131598385) for more details.
Unfortunately, we don't have a simple flag to tell if a output committer will write to temporary directory or not, so for safety, we have to disable any customized output committer when `speculation` is true.
Author: Wenchen Fan <cloud0fan@outlook.com>
Closes#8687 from cloud-fan/direct-committer.