In tests, we may want to have BlockManagers of size < 1MB (spark.storage.unrollMemoryThreshold). However, these BlockManagers are useless because we can't unroll anything in them ever. At the very least we need to log a warning.
tdas
Author: Andrew Or <andrew@databricks.com>
Closes#2917 from andrewor14/unroll-safely-logging and squashes the following commits:
38947e3 [Andrew Or] Warn against starting a block manager that's too small
fd621b4 [Andrew Or] Warn against failure to reserve initial memory threshold
This pull request is a first step towards the implementation of a stable, pull-based progress / status API for Spark (see [SPARK-2321](https://issues.apache.org/jira/browse/SPARK-2321)). For now, I'd like to discuss the basic implementation, API names, and overall interface design. Once we arrive at a good design, I'll go back and add additional methods to expose more information via these API.
#### Design goals:
- Pull-based API
- Usable from Java / Scala / Python (eventually, likely with a wrapper)
- Can be extended to expose more information without introducing binary incompatibilities.
- Returns immutable objects.
- Don't leak any implementation details, preserving our freedom to change the implementation.
#### Implementation:
- Add public methods (`getJobInfo`, `getStageInfo`) to SparkContext to allow status / progress information to be retrieved.
- Add public interfaces (`SparkJobInfo`, `SparkStageInfo`) for our API return values. These interfaces consist entirely of Java-style getter methods. The interfaces are currently implemented in Java. I decided to explicitly separate the interface from its implementation (`SparkJobInfoImpl`, `SparkStageInfoImpl`) in order to prevent users from constructing these responses themselves.
-Allow an existing JobProgressListener to be used when constructing a live SparkUI. This allows us to re-use this listeners in the implementation of this status API. There are a few reasons why this listener re-use makes sense:
- The status API and web UI are guaranteed to show consistent information.
- These listeners are already well-tested.
- The same garbage-collection / information retention configurations can apply to both this API and the web UI.
- Extend JobProgressListener to maintain `jobId -> Job` and `stageId -> Stage` mappings.
The progress API methods are implemented in a separate trait that's mixed into SparkContext. This helps to avoid SparkContext.scala from becoming larger and more difficult to read.
Author: Josh Rosen <joshrosen@databricks.com>
Author: Josh Rosen <joshrosen@apache.org>
Closes#2696 from JoshRosen/progress-reporting-api and squashes the following commits:
e6aa78d [Josh Rosen] Add tests.
b585c16 [Josh Rosen] Accept SparkListenerBus instead of more specific subclasses.
c96402d [Josh Rosen] Address review comments.
2707f98 [Josh Rosen] Expose current stage attempt id
c28ba76 [Josh Rosen] Update demo code:
646ff1d [Josh Rosen] Document spark.ui.retainedJobs.
7f47d6d [Josh Rosen] Clean up SparkUI constructors, per Andrew's feedback.
b77b3d8 [Josh Rosen] Merge remote-tracking branch 'origin/master' into progress-reporting-api
787444c [Josh Rosen] Move status API methods into trait that can be mixed into SparkContext.
f9a9a00 [Josh Rosen] More review comments:
3dc79af [Josh Rosen] Remove creation of unused listeners in SparkContext.
249ca16 [Josh Rosen] Address several review comments:
da5648e [Josh Rosen] Add example of basic progress reporting in Java.
7319ffd [Josh Rosen] Add getJobIdsForGroup() and num*Tasks() methods.
cc568e5 [Josh Rosen] Add note explaining that interfaces should not be implemented outside of Spark.
6e840d4 [Josh Rosen] Remove getter-style names and "consistent snapshot" semantics:
08cbec9 [Josh Rosen] Begin to sketch the interfaces for a stable, public status API.
ac2d13a [Josh Rosen] Add jobId->stage, stageId->stage mappings in JobProgressListener
24de263 [Josh Rosen] Create UI listeners in SparkContext instead of in Tabs:
If classes implementing Serializable or Externalizable interfaces throw
exceptions other than IOException or ClassNotFoundException from their
(de)serialization methods, then this results in an unhelpful
"IOException: unexpected exception type" rather than the actual exception that
produced the (de)serialization error.
This patch fixes this by adding a utility method that re-wraps any uncaught
exceptions in IOException (unless they are already instances of IOException).
Author: Josh Rosen <joshrosen@databricks.com>
Closes#2932 from JoshRosen/SPARK-4080 and squashes the following commits:
cd3a9be [Josh Rosen] [SPARK-4080] Only throw IOException from [write|read][Object|External].
https://issues.apache.org/jira/browse/SPARK-4067
currently , we call Utils.tryOrExit everywhere
AppClient
Executor
TaskSchedulerImpl
It makes the name of ExecutorUncaughtExceptionHandler unfit to the real case....
Author: Nan Zhu <nanzhu@Nans-MacBook-Pro.local>
Author: Nan Zhu <nanzhu@nans-mbp.home>
Closes#2913 from CodingCat/SPARK-4067 and squashes the following commits:
035ee3d [Nan Zhu] make RAT happy
e62e416 [Nan Zhu] add some general Exit code
a10b63f [Nan Zhu] refactor
In the existing code, each coarse-grained executor has two concurrently running actor systems. This causes many more error messages to be logged than necessary when the executor is lost or killed because we receive a disassociation event for each of these actor systems.
This is blocking #2840.
Author: Andrew Or <andrewor14@gmail.com>
Closes#2863 from andrewor14/executor-actor-system and squashes the following commits:
44ce2e0 [Andrew Or] Avoid starting two actor systems on each executor
In deploy.ClientArguments.isValidJarUrl, the url is checked as follows.
def isValidJarUrl(s: String): Boolean = s.matches("(.+):(.+)jar")
So, it allows like 'hdfs:file.jar' (no authority).
Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp>
Closes#2925 from sarutak/uri-syntax-check-improvement and squashes the following commits:
cf06173 [Kousuke Saruta] Improved URI syntax checking
If Spark lunched multiple executors in one host for one application, every executor would download it dependent files and jars (if not using local: url) independently. It maybe result in huge latency. In my case, it result in 20 seconds latency to download dependent jars(size about 17M) when I lunched 32 executors in every host(total 4 hosts).
This patch will cache downloaded files and jars for executors to reduce network throughput and download latency. In my case, the latency was reduced from 20 seconds to less than 1 second.
Author: Li Zhihui <zhihui.li@intel.com>
Author: li-zhihui <zhihui.li@intel.com>
Closes#1616 from li-zhihui/cachefiles and squashes the following commits:
36940df [Li Zhihui] Close cache for local mode
935fed6 [Li Zhihui] Clean code.
f9330d4 [Li Zhihui] Clean code again
7050d46 [Li Zhihui] Clean code
074a422 [Li Zhihui] Fix: deal with spark.files.overwrite
03ed3a8 [li-zhihui] rename cache file name as XXXXXXXXX_cache
2766055 [li-zhihui] Use url.hashCode + timestamp as cachedFileName
76a7b66 [Li Zhihui] Clean code & use applcation work directory as cache directory
3510eb0 [Li Zhihui] Keep fetchFile private
2ffd742 [Li Zhihui] add comment for FileLock
e0ebd48 [Li Zhihui] Try and finally lock.release
7fb7c0b [Li Zhihui] Release lock before copy files
6b997bf [Li Zhihui] Executors of same application in same host should only download files & jars once
After take(), maybe there are some garbage left in the socket, then next task assigned to this worker will hang because of corrupted data.
We should make sure the socket is clean before reuse it, write END_OF_STREAM at the end, and check it after read out all result from python.
Author: Davies Liu <davies.liu@gmail.com>
Author: Davies Liu <davies@databricks.com>
Closes#2838 from davies/fix_reuse and squashes the following commits:
8872914 [Davies Liu] fix tests
660875b [Davies Liu] fix bug while reuse worker after take()
This commit fixes a bug in MapStatus that could cause jobs to wrongly return
empty results if those jobs contained stages with more than 2000 partitions
where most of those partitions were empty.
For jobs with > 2000 partitions, MapStatus uses HighlyCompressedMapStatus,
which only stores the average size of blocks. If the average block size is
zero, then this will cause all blocks to be reported as empty, causing
BlockFetcherIterator to mistakenly skip them.
For example, this would return an empty result:
sc.makeRDD(0 until 10, 1000).repartition(2001).collect()
This can also lead to deserialization errors (e.g. Snappy decoding errors)
for jobs with > 2000 partitions where the average block size is non-zero but
there is at least one empty block. In this case, the BlockFetcher attempts to
fetch empty blocks and fails when trying to deserialize them.
The root problem here is that MapStatus has a (previously undocumented)
correctness property that was violated by HighlyCompressedMapStatus:
If a block is non-empty, then getSizeForBlock must be non-zero.
I fixed this by modifying HighlyCompressedMapStatus to store the average size
of _non-empty_ blocks and to use a compressed bitmap to track which blocks are
empty.
I also removed a test which was broken as originally written: it attempted
to check that HighlyCompressedMapStatus's size estimation error was < 10%,
but this was broken because HighlyCompressedMapStatus is only used for map
statuses with > 2000 partitions, but the test only created 50.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#2866 from JoshRosen/spark-4019 and squashes the following commits:
fc8b490 [Josh Rosen] Roll back hashset change, which didn't improve performance.
5faa0a4 [Josh Rosen] Incorporate review feedback
c8b8cae [Josh Rosen] Two performance fixes:
3b892dd [Josh Rosen] Address Reynold's review comments
ba2e71c [Josh Rosen] Add missing newline
609407d [Josh Rosen] Use Roaring Bitmap to track non-empty blocks.
c23897a [Josh Rosen] Use sets when comparing collect() results
91276a3 [Josh Rosen] [SPARK-4019] Fix MapStatus compression bug that could lead to empty results.
...r without a remove in between. The cause for that is unknown, and assumed a temp network issue.
However, since the second register is with a BlockManagerId on a different port, blockManagerInfo.contains() returns false, while blockManagerIdByExecutor returns Some. This inconsistency is caught in a conditional statement that does System.exit(1), which is a huge robustness issue for us.
The fix - simply remove the old id from both maps during register when this happens. We are mimicking the behavior of expireDeadHosts(), by doing local cleanup of the maps before trying to add new ones.
Also - added some logging for register and unregister.
This is just like https://github.com/apache/spark/pull/2854 except it's on master
Author: Tal Sliwowicz <tal.s@taboola.com>
Closes#2886 from tsliwowicz/master-block-mgr-removal and squashes the following commits:
094d508 [Tal Sliwowicz] some more white space change undone
41a2217 [Tal Sliwowicz] some more whitspaces change undone
7bcfc3d [Tal Sliwowicz] whitspaces fix
df9d98f [Tal Sliwowicz] Code review comments fixed
f48bce9 [Tal Sliwowicz] In long running contexts, we encountered the situation of double register without a remove in between. The cause for that is unknown, and assumed a temp network issue.
This PR fixes SPARK-3426, an issue where sort-based shuffle crashes if the
`spark.shuffle.spill.compress` and `spark.shuffle.compress` settings have
different values.
The problem is that sort-based shuffle's read and write paths use different
settings for determining whether to apply compression. ExternalSorter writes
runs to files using `TempBlockId` ids, which causes
`spark.shuffle.spill.compress` to be used for enabling compression, but these
spilled files end up being shuffled over the network and read as shuffle files
using `ShuffleBlockId` by BlockStoreShuffleFetcher, which causes
`spark.shuffle.compress` to be used for enabling decompression. As a result,
this leads to errors when these settings disagree.
Based on the discussions in #2247 and #2178, it sounds like we don't want to
remove the `spark.shuffle.spill.compress` setting. Therefore, I've tried to
come up with a fix where `spark.shuffle.spill.compress` is used to compress
data that's read and written locally and `spark.shuffle.compress` is used to
compress any data that will be fetched / read as shuffle blocks.
To do this, I split `TempBlockId` into two new id types, `TempLocalBlockId` and
`TempShuffleBlockId`, which map to `spark.shuffle.spill.compress` and
`spark.shuffle.compress`, respectively. ExternalAppendOnlyMap also used temp
blocks for spilling data. It looks like ExternalSorter was designed to be
a generic sorter but its configuration already happens to be tied to sort-based
shuffle, so I think it's fine if we use `spark.shuffle.compress` to compress
its spills; we can move the compression configuration to the constructor in
a later commit if we find that ExternalSorter is being used in other contexts
where we want different configuration options to control compression. To
summarize:
**Before:**
| | ExternalAppendOnlyMap | ExternalSorter |
|-------|------------------------------|------------------------------|
| Read | spark.shuffle.spill.compress | spark.shuffle.compress |
| Write | spark.shuffle.spill.compress | spark.shuffle.spill.compress |
**After:**
| | ExternalAppendOnlyMap | ExternalSorter |
|-------|------------------------------|------------------------|
| Read | spark.shuffle.spill.compress | spark.shuffle.compress |
| Write | spark.shuffle.spill.compress | spark.shuffle.compress |
Thanks to andrewor14 for debugging this with me!
Author: Josh Rosen <joshrosen@databricks.com>
Closes#2890 from JoshRosen/SPARK-3426 and squashes the following commits:
1921cf6 [Josh Rosen] Minor edit for clarity.
c8dd8f2 [Josh Rosen] Add comment explaining use of createTempShuffleBlock().
2c687b9 [Josh Rosen] Fix SPARK-3426.
91e7e40 [Josh Rosen] Combine tests into single test of all combinations
76ca65e [Josh Rosen] Add regression test for SPARK-3426.
runningLocally is deprecated now
Author: CrazyJvm <crazyjvm@gmail.com>
Closes#2879 from CrazyJvm/runningLocally and squashes the following commits:
bec0b3e [CrazyJvm] use isRunningLocally rather than runningLocally
Author: Sandy Ryza <sandy@cloudera.com>
Closes#789 from sryza/sandy-spark-1813 and squashes the following commits:
48b05e9 [Sandy Ryza] Simplify
b824932 [Sandy Ryza] Allow both spark.kryo.classesToRegister and spark.kryo.registrator at the same time
6a15bb7 [Sandy Ryza] Small fix
a2278c0 [Sandy Ryza] Respond to review comments
6ef592e [Sandy Ryza] SPARK-1813. Add a utility to SparkConf that makes using Kryo really easy
See [JIRA](https://issues.apache.org/jira/browse/SPARK-3994) for more information. Also adds
a note which warns against using these methods.
Author: Aaron Davidson <aaron@databricks.com>
Closes#2839 from aarondav/countByKey and squashes the following commits:
d6fdb2a [Aaron Davidson] Respond to comments
e1f06d3 [Aaron Davidson] [SPARK-3994] Use standard Aggregator code path for countByKey and countByValue
If an executor fails without being scheduled to run any tasks, then `DAGScheduler` won't notify `BlockManagerMasterActor` that the associated block manager should be removed. Instead, the associated block manager will be expired only after a few rounds of heartbeat timeouts. In terms of removal treatment, there should really be no distinction between executors that have been scheduled tasks and those that have not.
The fix, then, is to add all known executors to `TaskSchedulerImpl`'s `activeExecutorIds` whether or not it has been scheduled a task. In fact, the existing comment above `activeExecutorIds` is
```
// Which executor IDs we have executors on
val activeExecutorIds = new HashSet[String]
```
not "Which executors have been scheduled tasks thus far."
Author: Andrew Or <andrewor14@gmail.com>
Closes#2865 from andrewor14/active-executors and squashes the following commits:
ff3172b [Andrew Or] Add all known executors to `activeExecutorIds`
Just found a typo. Should not use "%f" for Long.
Author: zsxwing <zsxwing@gmail.com>
Closes#2875 from zsxwing/SPARK-4035 and squashes the following commits:
ce347e2 [zsxwing] Fix a wrong format specifier
This PR makes several changes to TorrentBroadcast in order to make
it easier to reason about, which should help when debugging SPARK-3958.
The key changes:
- Remove all state from the global TorrentBroadcast object. This state
consisted mainly of configuration options, like the block size and
compression codec, and was read by the blockify / unblockify methods.
Unfortunately, the use of `lazy val` for `BLOCK_SIZE` meant that the block
size was always determined by the first SparkConf that TorrentBroadast was
initialized with; as a result, unit tests could not properly test
TorrentBroadcast with different block sizes.
Instead, blockifyObject and unBlockifyObject now accept compression codecs
and blockSizes as arguments. These arguments are supplied at the call sites
inside of TorrentBroadcast instances. Each TorrentBroadcast instance
determines these values from SparkEnv's SparkConf. I was careful to ensure
that we do not accidentally serialize CompressionCodec or SparkConf objects
as part of the TorrentBroadcast object.
- Remove special-case handling of local-mode in TorrentBroadcast. I don't
think that broadcast implementations should know about whether we're running
in local mode. If we want to optimize the performance of broadcast in local
mode, then we should detect this at a higher level and use a dummy
LocalBroadcastFactory implementation instead.
Removing this code fixes a subtle error condition: in the old local mode
code, a failure to find the broadcast in the local BlockManager would lead
to an attempt to deblockify zero blocks, which could lead to confusing
deserialization or decompression errors when we attempted to decompress
an empty byte array. This should never have happened, though: a failure to
find the block in local mode is evidence of some other error. The changes
here will make it easier to debug those errors if they ever happen.
- Add a check that throws an exception when attempting to deblockify an
empty array.
- Use ScalaCheck to add a test to check that TorrentBroadcast's
blockifyObject and unBlockifyObject methods are inverses.
- Misc. cleanup and logging improvements.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#2844 from JoshRosen/torrentbroadcast-bugfix and squashes the following commits:
1e8268d [Josh Rosen] Address Reynold's review comments
2a9fdfd [Josh Rosen] Address Reynold's review comments.
c3b08f9 [Josh Rosen] Update TorrentBroadcast tests to reflect removal of special local-mode optimizations.
5c22782 [Josh Rosen] Store broadcast variable's value in the driver.
33fc754 [Josh Rosen] Change blockify/unblockifyObject to accept serializer as argument.
618a872 [Josh Rosen] [SPARK-3958] TorrentBroadcast cleanup / debugging improvements.
Before, if the master node is killed and restarted, the worker nodes
would not attempt to reconnect to the Master. Therefore, when the Master
node was restarted, the worker nodes needed to be restarted as well.
Now, when the Master node is disconnected, the worker nodes will
continuously ping the master node in attempts to reconnect to it. Once
the master node restarts, it will detect one of the registration
requests from its former workers. The result is that the cluster
re-enters a healthy state.
In addition, when the master does not receive a heartbeat from the
worker, the worker was removed; however, when the worker sent a
heartbeat to the master, the master used to ignore the heartbeat. Now,
a master that receives a heartbeat from a worker that had been
disconnected will request the worker to re-attempt the registration
process, at which point the worker will send a RegisterWorker request
and be re-connected accordingly.
Re-connection attempts per worker are submitted every N seconds, where N
is configured by the property spark.worker.reconnect.interval - this has
a default of 60 seconds right now.
Author: mcheah <mcheah@palantir.com>
Closes#2828 from mccheah/reconnect-dead-workers and squashes the following commits:
83f8bc9 [mcheah] [SPARK-3736] More informative log message, and fixing some indentation.
fe0e02f [mcheah] [SPARK-3736] Moving reconnection logic to registerWithMaster().
94ddeca [mcheah] [SPARK-3736] Changing a log warning to a log info.
a698e35 [mcheah] [SPARK-3736] Addressing PR comment to make some defs private.
b9a3077 [mcheah] [SPARK-3736] Addressing PR comments related to reconnection.
2ad5ed5 [mcheah] [SPARK-3736] Cancel attempts to reconnect if the master changes.
b5b34af [mcheah] [SPARK-3736] Workers reconnect when disassociated from the master.
The problem caused by #1966
CC YanTangZhai andrewor14
Author: GuoQiang Li <witgo@qq.com>
Closes#2858 from witgo/SPARK-4010 and squashes the following commits:
9866fbf [GuoQiang Li] Spark UI returns 500 in yarn-client mode
Kernel 2.6.32 bug will lead to unexpected behavior of transferTo in copyStream, and this will corrupt the shuffle output file in sort-based shuffle, which will somehow introduce PARSING_ERROR(2), deserialization error or offset out of range. Here fix this by adding append flag, also add some position checking code. Details can be seen in [SPARK-3948](https://issues.apache.org/jira/browse/SPARK-3948).
Author: jerryshao <saisai.shao@intel.com>
Closes#2824 from jerryshao/SPARK-3948 and squashes the following commits:
be0533a [jerryshao] Address the comments
a82b184 [jerryshao] add configuration to control the NIO way of copying stream
e17ada2 [jerryshao] Fix kernel 2.6.32 bug led unexpected behavior of transferTo
This PR adds a Java API for AsyncRDDActions and promotes the API from `Experimental` to stable.
Author: Josh Rosen <joshrosen@apache.org>
Author: Josh Rosen <joshrosen@databricks.com>
Closes#2760 from JoshRosen/async-rdd-actions-in-java and squashes the following commits:
0d45fbc [Josh Rosen] Whitespace fix.
ad3ae53 [Josh Rosen] Merge remote-tracking branch 'origin/master' into async-rdd-actions-in-java
c0153a5 [Josh Rosen] Remove unused variable.
e8e2867 [Josh Rosen] Updates based on Marcelo's review feedback
7a1417f [Josh Rosen] Removed unnecessary java.util import.
6f8f6ac [Josh Rosen] Fix import ordering.
ff28e49 [Josh Rosen] Add MiMa excludes and fix a scalastyle error.
346e46e [Josh Rosen] [SPARK-3902] Stabilize AsyncRDDActions; add Java API.
This patch attempts to fix SPARK-2546 in `branch-1.0` and `branch-1.1`. The underlying problem is that thread-safety issues in Hadoop Configuration objects may cause Spark tasks to get stuck in infinite loops. The approach taken here is to clone a new copy of the JobConf for each task rather than sharing a single copy between tasks. Note that there are still Configuration thread-safety issues that may affect the driver, but these seem much less likely to occur in practice and will be more complex to fix (see discussion on the SPARK-2546 ticket).
This cloning is guarded by a new configuration option (`spark.hadoop.cloneConf`) and is disabled by default in order to avoid unexpected performance regressions for workloads that are unaffected by the Configuration thread-safety issues.
Author: Josh Rosen <joshrosen@apache.org>
Closes#2684 from JoshRosen/jobconf-fix-backport and squashes the following commits:
f14f259 [Josh Rosen] Add configuration option to control cloning of Hadoop JobConf.
b562451 [Josh Rosen] Remove unused jobConfCacheKey field.
dd25697 [Josh Rosen] [SPARK-2546] [1.0 / 1.1 backport] Clone JobConf for each task.
(cherry picked from commit 2cd40db2b3)
Signed-off-by: Josh Rosen <joshrosen@databricks.com>
Conflicts:
core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Make JavaPairRDD.collectAsMap result Serializable since Java Maps generally are
Author: Sean Owen <sowen@cloudera.com>
Closes#2805 from srowen/SPARK-3926 and squashes the following commits:
ecb78ee [Sean Owen] Fix conflict between java.io.Serializable and use of Scala's Serializable
f4717f9 [Sean Owen] Oops, fix compile problem
ae1b36f [Sean Owen] Expand to cover Maps returned from other Java API methods as well
51c26c2 [Sean Owen] Make JavaPairRDD.collectAsMap result Serializable since Java Maps generally are
There is a unused variable(count) in saveAsHadoopDataset in PairRDDFunctions.scala. The initial idea of this variable seems to count the number of records, so I am adding a log statement to log the number of records that has been written to the writer.
Author: likun <jacky.likun@huawei.com>
Author: jackylk <jacky.likun@huawei.com>
Closes#2791 from jackylk/SPARK-3935 and squashes the following commits:
a874047 [jackylk] removing the unused variable in PairRddFunctions.scala
3bf43c7 [likun] log the number of records has been written
Its hard to debug which broadcast variables refer to what in a big codebase. Printing call site information helps in debugging.
Author: Shivaram Venkataraman <shivaram@cs.berkeley.edu>
Closes#2829 from shivaram/spark-broadcast-print and squashes the following commits:
cd6dbdf [Shivaram Venkataraman] Print call site information for broadcasts
JobProgressPage could not show Fair Scheduler Pools section sometimes.
SparkContext starts webui and then postEnvironmentUpdate. Sometimes JobProgressPage is accessed between webui starting and postEnvironmentUpdate, then the lazy val isFairScheduler will be false. The Fair Scheduler Pools section will not display any more.
Author: yantangzhai <tyz0303@163.com>
Author: YanTangZhai <hakeemzhai@tencent.com>
Closes#1966 from YanTangZhai/SPARK-3067 and squashes the following commits:
d4323f8 [yantangzhai] update [SPARK-3067] JobProgressPage could not show Fair Scheduler Pools section sometimes
8a00106 [YanTangZhai] Merge pull request #6 from apache/master
b6391cc [yantangzhai] revert [SPARK-3067] JobProgressPage could not show Fair Scheduler Pools section sometimes
d2226cd [yantangzhai] [SPARK-3067] JobProgressPage could not show Fair Scheduler Pools section sometimes
cbcba66 [YanTangZhai] Merge pull request #3 from apache/master
aac7f7b [yantangzhai] [SPARK-3067] JobProgressPage could not show Fair Scheduler Pools section sometimes
cdef539 [YanTangZhai] Merge pull request #1 from apache/master
Sorry. I found that I forgot to add `afterExecute` for `handleConnectExecutor` in #2593.
Author: zsxwing <zsxwing@gmail.com>
Closes#2794 from zsxwing/SPARK-3741 and squashes the following commits:
a0bc4dd [zsxwing] Add afterExecute for handleConnectExecutor
In BlockManagermasterActor, _remainingMem would increase memSize for twice when updateBlockInfo if new storageLevel is invalid and old storageLevel is "useMemory". Also, _remainingMem should increase with original memory size instead of new memSize.
Author: Zhang, Liye <liye.zhang@intel.com>
Closes#2792 from liyezhang556520/spark-3941-remainMem and squashes the following commits:
3d487cc [Zhang, Liye] make the code concise
0380a32 [Zhang, Liye] [SPARK-3941][CORE] _remainingmem should not increase twice when updateBlockInfo
Something about the 2.3.4 upgrade seems to have made the issue manifest where all the services disconnect from each other after exactly 1000 seconds (which is the heartbeat interval). [This post](https://groups.google.com/forum/#!topic/akka-user/X3xzpTCbEFs) suggests that heartbeat pause should be greater than heartbeat interval, and increasing the pause from 600s to 6000s seems to have rectified the issue. My current cluster has now exceeded 1400s of uptime without failure!
I do not know why this fixed it, because the threshold we have set for the failure detector is the exponent of a timeout, and 300 is extremely large. Perhaps the default failure detector changed in 2.3.4 and now ignores threshold.
Author: Aaron Davidson <aaron@databricks.com>
Closes#2784 from aarondav/fix-timeout and squashes the following commits:
bd1151a [Aaron Davidson] Increase pause, don't decrease interval
9cb0372 [Aaron Davidson] [SPARK-3923] Decrease Akka heartbeat interval below heartbeat pause
This is a small number of clean-up changes on top of #2782. Closes#2782.
Author: Prashant Sharma <prashant.s@imaginea.com>
Author: Patrick Wendell <pwendell@gmail.com>
Closes#2803 from pwendell/pr-2782 and squashes the following commits:
56d5b7a [Patrick Wendell] Minor clean-up
44089ec [Patrick Wendell] Clean-up the TaskContext API.
ed551ce [Prashant Sharma] Fixed a typo
df261d0 [Prashant Sharma] Josh's suggestion
facf3b1 [Prashant Sharma] Fixed the mima issue.
7ecc2fe [Prashant Sharma] CR, Moved implementations to TaskContextImpl
bbd9e05 [Prashant Sharma] adding missed out files to git.
ef633f5 [Prashant Sharma] SPARK-3874, Provide stable TaskContext API
Customized pickler should be registered before unpickling, but in executor, there is no way to register the picklers before run the tasks.
So, we need to register the picklers in the tasks itself, duplicate the javaToPython() and pythonToJava() in MLlib, call SerDe.initialize() before pickling or unpickling.
Author: Davies Liu <davies.liu@gmail.com>
Closes#2830 from davies/fix_pickle and squashes the following commits:
0c85fb9 [Davies Liu] revert the privacy change
6b94e15 [Davies Liu] use JavaConverters instead of JavaConversions
0f02050 [Davies Liu] hotfix: Customized pickler does not work in cluster
Author: Shiti <ssaxena.ece@gmail.com>
Closes#2810 from Shiti/master and squashes the following commits:
051d82f [Shiti] setting the default value of uri scheme to "file" where matching "file" or None yields the same result
This is another implementation about #1256
cc andrewor14 vanzin
Author: GuoQiang Li <witgo@qq.com>
Closes#2379 from witgo/SPARK-2098-new and squashes the following commits:
4ef1cbd [GuoQiang Li] review commit
49ef70e [GuoQiang Li] Refactor getDefaultPropertiesFile
c45d20c [GuoQiang Li] All Spark processes should support spark-defaults.conf, config file
Author: shitis <ssaxena.ece@gmail.com>
Closes#2795 from Shiti/master and squashes the following commits:
46897d7 [shitis] Using Option Wrapper to convert String with value null to None
Validate the memory is greater than zero when set from the SPARK_WORKER_MEMORY environment variable or command line without a g or m label. Added unit tests. If memory is 0 an IllegalStateException is thrown. Updated unit tests to mock environment variables by subclassing SparkConf (tip provided by Josh Rosen). Updated WorkerArguments to use SparkConf.getenv instead of System.getenv for reading the SPARK_WORKER_MEMORY environment variable.
Author: Bill Bejeck <bbejeck@gmail.com>
Closes#2309 from bbejeck/spark-memory-worker and squashes the following commits:
51cf915 [Bill Bejeck] SPARK-3178 - Validate the memory is greater than zero when set from the SPARK_WORKER_MEMORY environment variable or command line without a g or m label. Added unit tests. If memory is 0 an IllegalStateException is thrown. Updated unit tests to mock environment variables by subclassing SparkConf (tip provided by Josh Rosen). Updated WorkerArguments to use SparkConf.getenv instead of System.getenv for reading the SPARK_WORKER_MEMORY environment variable.
The goal of this patch is to fix the swapped arguments in standalone mode, which was caused by 79e45c9323 (diff-79391110e9f26657e415aa169a004998R153).
More details can be found in the JIRA: [SPARK-3921](https://issues.apache.org/jira/browse/SPARK-3921)
Tested in Standalone mode, but not in Mesos.
Author: Aaron Davidson <aaron@databricks.com>
Closes#2779 from aarondav/fix-standalone and squashes the following commits:
725227a [Aaron Davidson] Fix ExecutorRunnerTest
9d703fe [Aaron Davidson] [SPARK-3921] Fix CoarseGrainedExecutorBackend's arguments for Standalone mode
In the comment (Line 1083), it says: "Otherwise, interpolate the number of partitions we need to try, but overestimate it by 50%."
`(1.5 * num * partsScanned / buf.size).toInt` is the guess of "num of total partitions needed". In every iteration, we should consider the increment `(1.5 * num * partsScanned / buf.size).toInt - partsScanned`
Existing implementation 'exponentially' grows `partsScanned ` ( roughly: `x_{n+1} >= (1.5 + 1) x_n`)
This could be a performance problem. (unless this is the intended behavior)
Author: yingjieMiao <yingjie@42go.com>
Closes#2648 from yingjieMiao/rdd_take and squashes the following commits:
d758218 [yingjieMiao] scala style fix
a8e74bb [yingjieMiao] python style fix
4b6e777 [yingjieMiao] infix operator style fix
4391d3b [yingjieMiao] typo fix.
692f4e6 [yingjieMiao] cap numPartsToTry
c4483dc [yingjieMiao] style fix
1d2c410 [yingjieMiao] also change in rdd.py and AsyncRDD
d31ff7e [yingjieMiao] handle the edge case after 1 iteration
a2aa36b [yingjieMiao] RDD take method: overestimate too much
Author: GuoQiang Li <witgo@qq.com>
Closes#2763 from witgo/SPARK-3905 and squashes the following commits:
17d7990 [GuoQiang Li] The keys for sorting the columns of Executor page ,Stage page Storage page are incorrect
val path = ... //path to seq file with BytesWritable as type of both key and value
val file = sc.sequenceFile[Array[Byte],Array[Byte]](path)
file.take(1)(0)._1
This prints incorrect content of byte array. Actual content starts with correct one and some "random" bytes and zeros are appended. BytesWritable has two methods:
getBytes() - return content of all internal array which is often longer then actual value stored. It usually contains the rest of previous longer values
copyBytes() - return just begining of internal array determined by internal length property
It looks like in implicit conversion between BytesWritable and Array[byte] getBytes is used instead of correct copyBytes.
dbtsai
Author: Jakub Dubovský <james64@inMail.sk>
Author: Dubovsky Jakub <dubovsky@avast.com>
Closes#2712 from james64/3121-bugfix and squashes the following commits:
f85d24c [Jakub Dubovský] Test name changed, comments added
1b20d51 [Jakub Dubovský] Import placed correctly
406e26c [Jakub Dubovský] Scala style fixed
f92ffa6 [Dubovsky Jakub] performance tuning
480f9cd [Dubovsky Jakub] Bug 3121 fixed
When reporting that a remote error occurred, the ConnectionManager should also log the stacktrace of the remote exception. This PR accomplishes this by sending the remote exception's stacktrace as the payload in the "negative ACK / error message."
Author: Josh Rosen <joshrosen@apache.org>
Closes#2741 from JoshRosen/propagate-cm-exceptions-to-sender and squashes the following commits:
b5366cc [Josh Rosen] Explicitly encode error messages using UTF-8.
cef18b3 [Josh Rosen] [SPARK-3887] Send stracktrace in ConnectionManager error messages.
...riden alternatives, can have default argument.
Author: Prashant Sharma <prashant.s@imaginea.com>
Closes#2750 from ScrapCodes/SPARK-2924/default-args-removed and squashes the following commits:
d9785c3 [Prashant Sharma] [SPARK-2924] Required by scala 2.11, only one function/ctor amongst overriden alternatives, can have default argument.
In general, individual shuffle blocks are frequently small, so mmapping them often creates a lot of waste. It may not be bad to mmap the larger ones, but it is pretty inconvenient to get configuration into ManagedBuffer, and besides it is unlikely to help all that much.
Author: Aaron Davidson <aaron@databricks.com>
Closes#2742 from aarondav/mmap and squashes the following commits:
a152065 [Aaron Davidson] Add other pathway back
52b6cd2 [Aaron Davidson] [SPARK-3889] Attempt to avoid SIGBUS by not mmapping files in ConnectionManager
This is a second rev of the Akka upgrade (earlier merged, but reverted). I made a slight modification which is that I also upgrade Hive to deal with a compatibility issue related to the protocol buffers library.
Author: Anand Avati <avati@redhat.com>
Author: Patrick Wendell <pwendell@gmail.com>
Closes#2752 from pwendell/akka-upgrade and squashes the following commits:
4c7ca3f [Patrick Wendell] Upgrading to new hive->protobuf version
57a2315 [Anand Avati] SPARK-1812: streaming - remove tests which depend on akka.actor.IO
2a551d3 [Anand Avati] SPARK-1812: core - upgrade to akka 2.3.4
I noticed a few issues with how temp directories are created and deleted:
*Minor*
* Guava's `Files.createTempDir()` plus `File.deleteOnExit()` is used in many tests to make a temp dir, but `Utils.createTempDir()` seems to be the standard Spark mechanism
* Call to `File.deleteOnExit()` could be pushed into `Utils.createTempDir()` as well, along with this replacement
* _I messed up the message in an exception in `Utils` in SPARK-3794; fixed here_
*Bit Less Minor*
* `Utils.deleteRecursively()` fails immediately if any `IOException` occurs, instead of trying to delete any remaining files and subdirectories. I've observed this leave temp dirs around. I suggest changing it to continue in the face of an exception and throw one of the possibly several exceptions that occur at the end.
* `Utils.createTempDir()` will add a JVM shutdown hook every time the method is called. Even if the subdir is the parent of another parent dir, since this check is inside the hook. However `Utils` manages a set of all dirs to delete on shutdown already, called `shutdownDeletePaths`. A single hook can be registered to delete all of these on exit. This is how Tachyon temp paths are cleaned up in `TachyonBlockManager`.
I noticed a few other things that might be changed but wanted to ask first:
* Shouldn't the set of dirs to delete be `File`, not just `String` paths?
* `Utils` manages the set of `TachyonFile` that have been registered for deletion, but the shutdown hook is managed in `TachyonBlockManager`. Should this logic not live together, and not in `Utils`? it's more specific to Tachyon, and looks a slight bit odd to import in such a generic place.
Author: Sean Owen <sowen@cloudera.com>
Closes#2670 from srowen/SPARK-3811 and squashes the following commits:
071ae60 [Sean Owen] Update per @vanzin's review
da0146d [Sean Owen] Make Utils.deleteRecursively try to delete all paths even when an exception occurs; use one shutdown hook instead of one per method call to delete temp dirs
3a0faa4 [Sean Owen] Standardize on Utils.createTempDir instead of Files.createTempDir
This pull request addresses a few issues related to PySpark's IPython support:
- Fix the remaining uses of the '-u' flag, which IPython doesn't support (see SPARK-3772).
- Change PYSPARK_PYTHON_OPTS to PYSPARK_DRIVER_PYTHON_OPTS, so that the old name is reserved in case we ever want to allow the worker Python options to be customized (this variable was introduced in #2554 and hasn't landed in a release yet, so this doesn't break any compatibility).
- Introduce a PYSPARK_DRIVER_PYTHON option that allows the driver to use `ipython` while the workers use a different Python version.
- Attempt to use Python 2.7 by default if PYSPARK_PYTHON is not specified.
- Retain the old semantics for IPYTHON=1 and IPYTHON_OPTS (to avoid breaking existing example programs).
There are more details in a block comment in `bin/pyspark`.
Author: Josh Rosen <joshrosen@apache.org>
Closes#2651 from JoshRosen/SPARK-3772 and squashes the following commits:
7b8eb86 [Josh Rosen] More changes to PySpark python executable configuration:
c4f5778 [Josh Rosen] [SPARK-3772] Allow ipython to be used by Pyspark workers; IPython fixes:
...re logs to avoid Executors swallowing errors
This PR made the following changes:
* Register a callback to `Connection` so that the error will be propagated properly.
* Add more logs so that the errors won't be swallowed by Executors.
* Use trySuccess/tryFailure because `Promise` doesn't allow to call success/failure more than once.
Author: zsxwing <zsxwing@gmail.com>
Closes#2593 from zsxwing/SPARK-3741 and squashes the following commits:
1d5aed5 [zsxwing] Fix naming
0b8a61c [zsxwing] Merge branch 'master' into SPARK-3741
764aec5 [zsxwing] [SPARK-3741] Make ConnectionManager propagate errors properly and add more logs to avoid Executors swallowing errors
Truncate appName in WebUI if it is too long.
Author: Xiangrui Meng <meng@databricks.com>
Closes#2707 from mengxr/truncate-app-name and squashes the following commits:
87834ce [Xiangrui Meng] move scala import below java
c7111dc [Xiangrui Meng] truncate appName in WebUI if it is too long
Upgrade to akka 2.3.4
Author: Anand Avati <avati@redhat.com>
Closes#1685 from avati/SPARK-1812-akka-2.3 and squashes the following commits:
57a2315 [Anand Avati] SPARK-1812: streaming - remove tests which depend on akka.actor.IO
2a551d3 [Anand Avati] SPARK-1812: core - upgrade to akka 2.3.4