## What changes were proposed in this pull request?
For TransportFactory, the requests sent to the same address share a clientPool.
Specially, when the io.numConnectionPerPeer is 1, these requests would share a same client.
When this address is unreachable, the createClient operation would be still timeout.
And these requests would block each other during createClient, because there is a lock for this shared client.
It would cost connectionNum \* connectionTimeOut \* maxRetry to retry, and then fail the task.
It fact, it is expected that this task could fail in connectionTimeOut * maxRetry.
In this PR, I set a fastFail time window for the clientPool, if the last connection failed in this time window, the new connection would fast fail.
## Why are the changes needed?
It can save time for some cases.
## Does this PR introduce any user-facing change?
No.
## How was this patch tested?
Existing UT.
Closes#27943 from turboFei/SPARK-31179-fast-fail-connection.
Authored-by: turbofei <fwang12@ebay.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
### What changes were proposed in this pull request?
Fix the regression caused by #22173.
The original PR changes the logic of handling `ChunkFetchReqeust` from async to sync, that's causes the shuffle benchmark regression. This PR fixes the regression back to the async mode by reusing the config `spark.shuffle.server.chunkFetchHandlerThreadsPercent`.
When the user sets the config, ChunkFetchReqeust will be processed in a separate event loop group, otherwise, the code path is exactly the same as before.
### Why are the changes needed?
Fix the shuffle performance regression described in https://github.com/apache/spark/pull/22173#issuecomment-572459561
### Does this PR introduce any user-facing change?
Yes, this PR disable the separate event loop for FetchRequest by default.
### How was this patch tested?
Existing UT.
Closes#27665 from xuanyuanking/SPARK-24355-follow.
Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This patch is to bump the master branch version to 3.1.0-SNAPSHOT.
### Why are the changes needed?
N/A
### Does this PR introduce any user-facing change?
N/A
### How was this patch tested?
N/A
Closes#27698 from gatorsmile/updateVersion.
Authored-by: gatorsmile <gatorsmile@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
Adding a dedicated boss event loop group to the Netty pipeline in the External Shuffle Service to avoid the delay in channel registration.
```
EventLoopGroup bossGroup = NettyUtils.createEventLoop(ioMode, 1,
conf.getModuleName() + "-boss");
EventLoopGroup workerGroup = NettyUtils.createEventLoop(ioMode, conf.serverThreads(),
conf.getModuleName() + "-server");
bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NettyUtils.getServerChannelClass(ioMode))
.option(ChannelOption.ALLOCATOR, allocator)
```
### Why are the changes needed?
We have been seeing a large number of SASL authentication (RPC requests) timing out with the external shuffle service.
```
java.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout waiting for task.
at org.spark-project.guava.base.Throwables.propagate(Throwables.java:160)
at org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:278)
at org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:80)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228)
at org.apache.spark.network.client.TransportClientFactory.createUnmanagedClient(TransportClientFactory.java:181)
at org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:141)
at org.apache.spark.storage.BlockManager$$anonfun$registerWithExternalShuffleServer$1.apply$mcVI$sp(BlockManager.scala:218)
```
The investigation that we have done is described here:
https://github.com/netty/netty/issues/9890
After adding `LoggingHandler` to the netty pipeline, we saw that the registration of the channel was getting delay which is because the worker threads are busy with the existing channels.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
We have tested the patch on our clusters and with a stress testing tool. After this change, we didn't see any SASL requests timing out. Existing unit tests pass.
Closes#27240 from otterc/SPARK-30512.
Authored-by: Chandni Singh <chsingh@linkedin.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
The default value for backLog set back to -1, as any other value may break existing configuration by overriding Netty's default io.netty.util.NetUtil#SOMAXCONN. The documentation accordingly adjusted.
See discussion thread: https://github.com/apache/spark/pull/24732
### What changes were proposed in this pull request?
Partial rollback of https://github.com/apache/spark/pull/24732 (default for backLog set back to -1).
### Why are the changes needed?
Previous change introduces backward incompatibility by overriding default of Netty's `io.netty.util.NetUtil#SOMAXCONN`
Closes#27230 from xCASx/master.
Authored-by: Maxim Kolesnikov <swe.kolesnikov@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
Using compound operations as well as increments and decrements on primitive fields are not atomic operations. Here when volatile primitive field is incremented or decremented, we run into data loss if threads interleave in steps of update.
Refer: https://wiki.sei.cmu.edu/confluence/display/java/VNA02-J.+Ensure+that+compound+operations+on+shared+variables+are+atomic
### What changes were proposed in this pull request?
Using `AtomicLong` instead of `long`
### Why are the changes needed?
volatile primitive field is incremented or decremented, we run into data loss if threads interleave in steps of update.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
All Existing UT can pass with the Change
Closes#27071 from ajithme/atomic.
Authored-by: Ajith <ajith2489@gmail.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
### What changes were proposed in this pull request?
Remove usages of Guava that no longer work in Guava 27, and replace with workalikes. I'll comment on key types of changes below.
### Why are the changes needed?
Hadoop 3.2.1 uses Guava 27, so this helps us avoid problems running on Hadoop 3.2.1+ and generally lowers our exposure to Guava.
### Does this PR introduce any user-facing change?
Should not be, but see notes below on hash codes and toString.
### How was this patch tested?
Existing tests will verify whether these changes break anything for Guava 14.
I manually built with an updated version and it compiles with Guava 27; tests running manually locally now.
Closes#26911 from srowen/SPARK-30272.
Authored-by: Sean Owen <srowen@gmail.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
### What changes were proposed in this pull request?
1. Revert "Preparing development version 3.0.1-SNAPSHOT": 56dcd79
2. Revert "Preparing Spark release v3.0.0-preview2-rc2": c216ef1
### Why are the changes needed?
Shouldn't change master.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
manual test:
https://github.com/apache/spark/compare/5de5e46..wangyum:revert-masterCloses#26915 from wangyum/revert-master.
Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Yuming Wang <wgyumg@gmail.com>
The new auth code was missing this bit, so it was not possible to know which
app a client belonged to when auth was on.
I also refactored the SASL test that checks for this so it also checks the
new protocol (test failed before the fix, passes now).
Closes#26760 from vanzin/SPARK-30129.
Authored-by: Marcelo Vanzin <vanzin@cloudera.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This change adds a profile to switch to use the right leveldbjni package according to the platforms:
aarch64 uses org.openlabtesting.leveldbjni:leveldbjni-all.1.8, and other platforms use the old one org.fusesource.leveldbjni:leveldbjni-all.1.8.
And because some hadoop dependencies packages are also depend on org.fusesource.leveldbjni:leveldbjni-all, but hadoop merge the similar change on trunk, details see
https://issues.apache.org/jira/browse/HADOOP-16614, so exclude the dependency of org.fusesource.leveldbjni for these hadoop packages related.
Then Spark can build/test on aarch64 platform successfully.
Closes#26636 from huangtianhua/add-aarch64-leveldbjni.
Authored-by: huangtianhua <huangtianhua@huawei.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
### What changes were proposed in this pull request?
- Correctly release `ByteBuf` in `TransportCipher` in all cases
- Move closing / releasing logic to `handlerRemoved(...)` so we are guaranteed that is always called.
- Correctly release `frameBuf` it is not null when the handler is removed (and so also when the channel becomes inactive)
### Why are the changes needed?
We need to carefully manage the ownership / lifecycle of `ByteBuf` instances so we don't leak any of these. We did not correctly do this in all cases:
- when end up in invalid cipher state.
- when partial data was received and the channel is closed before the full frame is decoded
Fixes https://github.com/netty/netty/issues/9784.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Pass the newly added UTs.
Closes#26609 from normanmaurer/fix_leaks.
Authored-by: Norman Maurer <norman_maurer@apple.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
### What changes were proposed in this pull request?
The `assertEquals` method of JUnit Assert requires the first parameter to be the expected value. In this PR, I propose to change the order of parameters when the expected value is passed as the second parameter.
### Why are the changes needed?
Wrong order of assert parameters confuses when the assert fails and the parameters have special string representation. For example:
```java
assertEquals(input1.add(input2), new CalendarInterval(5, 5, 367200000000L));
```
```
java.lang.AssertionError:
Expected :interval 5 months 5 days 101 hours
Actual :interval 5 months 5 days 102 hours
```
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
By existing tests.
Closes#26377 from MaxGekk/fix-order-in-assert-equals.
Authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
To push the built jars to maven release repository, we need to remove the 'SNAPSHOT' tag from the version name.
Made the following changes in this PR:
* Update all the `3.0.0-SNAPSHOT` version name to `3.0.0-preview`
* Update the sparkR version number check logic to allow jvm version like `3.0.0-preview`
**Please note those changes were generated by the release script in the past, but this time since we manually add tags on master branch, we need to manually apply those changes too.**
We shall revert the changes after 3.0.0-preview release passed.
### Why are the changes needed?
To make the maven release repository to accept the built jars.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
N/A
### What changes were proposed in this pull request?
To push the built jars to maven release repository, we need to remove the 'SNAPSHOT' tag from the version name.
Made the following changes in this PR:
* Update all the `3.0.0-SNAPSHOT` version name to `3.0.0-preview`
* Update the PySpark version from `3.0.0.dev0` to `3.0.0`
**Please note those changes were generated by the release script in the past, but this time since we manually add tags on master branch, we need to manually apply those changes too.**
We shall revert the changes after 3.0.0-preview release passed.
### Why are the changes needed?
To make the maven release repository to accept the built jars.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
N/A
Closes#26243 from jiangxb1987/3.0.0-preview-prepare.
Lead-authored-by: Xingbo Jiang <xingbo.jiang@databricks.com>
Co-authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Xingbo Jiang <xingbo.jiang@databricks.com>
After the newly added shuffle block fetching protocol in #24565, we can keep this work by extending the FetchShuffleBlocks message.
### What changes were proposed in this pull request?
In this patch, we achieve the indeterminate shuffle rerun by reusing the task attempt id(unique id within an application) in shuffle id, so that each shuffle write attempt has a different file name. For the indeterministic stage, when the stage resubmits, we'll clear all existing map status and rerun all partitions.
All changes are summarized as follows:
- Change the mapId to mapTaskAttemptId in shuffle related id.
- Record the mapTaskAttemptId in MapStatus.
- Still keep mapId in ShuffleFetcherIterator for fetch failed scenario.
- Add the determinate flag in Stage and use it in DAGScheduler and the cleaning work for the intermediate stage.
### Why are the changes needed?
This is a follow-up work for #22112's future improvment[1]: `Currently we can't rollback and rerun a shuffle map stage, and just fail.`
Spark will rerun a finished shuffle write stage while meeting fetch failures, currently, the rerun shuffle map stage will only resubmit the task for missing partitions and reuse the output of other partitions. This logic is fine in most scenarios, but for indeterministic operations(like repartition), multiple shuffle write attempts may write different data, only rerun the missing partition will lead a correctness bug. So for the shuffle map stage of indeterministic operations, we need to support rolling back the shuffle map stage and re-generate the shuffle files.
### Does this PR introduce any user-facing change?
Yes, after this PR, the indeterminate stage rerun will be accepted by rerunning the whole stage. The original behavior is aborting the stage and fail the job.
### How was this patch tested?
- UT: Add UT for all changing code and newly added function.
- Manual Test: Also providing a manual test to verify the effect.
```
import scala.sys.process._
import org.apache.spark.TaskContext
val determinateStage0 = sc.parallelize(0 until 1000 * 1000 * 100, 10)
val indeterminateStage1 = determinateStage0.repartition(200)
val indeterminateStage2 = indeterminateStage1.repartition(200)
val indeterminateStage3 = indeterminateStage2.repartition(100)
val indeterminateStage4 = indeterminateStage3.repartition(300)
val fetchFailIndeterminateStage4 = indeterminateStage4.map { x =>
if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId == 190 &&
TaskContext.get.stageAttemptNumber == 0) {
throw new Exception("pkill -f -n java".!!)
}
x
}
val indeterminateStage5 = fetchFailIndeterminateStage4.repartition(200)
val finalStage6 = indeterminateStage5.repartition(100).collect().distinct.length
```
It's a simple job with multi indeterminate stage, it will get a wrong answer while using old Spark version like 2.2/2.3, and will be killed after #22112. With this fix, the job can retry all indeterminate stage as below screenshot and get the right result.
![image](https://user-images.githubusercontent.com/4833765/63948434-3477de00-caab-11e9-9ed1-75abfe6d16bd.png)
Closes#25620 from xuanyuanking/SPARK-25341-8.27.
Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
TransportClientFactory.createClient() is called by task and TransportClientFactory.close() is called by executor.
When stop the executor, close() will set workerGroup = null, NPE will occur in createClient which generate many exception in log.
For exception occurs after close(), treated it as an expected Exception
and transform it to InterruptedException which can be processed by Executor.
### Why are the changes needed?
The change can reduce the exception stack trace in log file, and user won't be confused by these excepted exception.
### Does this PR introduce any user-facing change?
N/A
### How was this patch tested?
New tests are added in TransportClientFactorySuite and ExecutorSuite
Closes#25759 from colinmjj/spark-19147.
Authored-by: colinma <colinma@tencent.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
### What changes were proposed in this pull request?
This is a follow-up of https://github.com/apache/spark/pull/25638 to switch `scala-library` from `test` dependency to `compile` dependency in `network-common` module.
### Why are the changes needed?
Previously, we added `scala-library` as a test dependency to resolve the followings, but it was insufficient to resolve. This PR aims to switch it to compile dependency.
```
$ java -version
openjdk version "11.0.3" 2019-04-16
OpenJDK Runtime Environment AdoptOpenJDK (build 11.0.3+7)
OpenJDK 64-Bit Server VM AdoptOpenJDK (build 11.0.3+7, mixed mode)
$ mvn clean install -pl common/network-common -DskipTests
...
[INFO] --- scala-maven-plugin:4.2.0:doc-jar (attach-scaladocs) spark-network-common_2.12 ---
error: fatal error: object scala in compiler mirror not found.
one error found
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
```
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Manually, run the following on JDK11.
```
$ mvn clean install -pl common/network-common -DskipTests
```
Closes#25800 from dongjoon-hyun/SPARK-28932-2.
Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?
This PR adds `scala-library` test dependency to `network-common` module for JDK11.
### Why are the changes needed?
In JDK11, the following command fails due to scala library.
```
mvn clean install -pl common/network-common -DskipTests
```
**BEFORE**
```
...
error: fatal error: object scala in compiler mirror not found.
one error found
...
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
```
**AFTER**
```
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
```
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Manual. On JDK11, do the following.
```
mvn clean install -pl common/network-common -DskipTests
```
Closes#25638 from dongjoon-hyun/SPARK-28932.
Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
This PR fixed typos in comments and replace the explicit type with '<>' for Java 8+.
## How was this patch tested?
Manually tested.
Closes#25338 from younggyuchun/younggyu.
Authored-by: younggyu chun <younggyuchun@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
This is very like #23590 .
`ByteBuffer.allocate` may throw `OutOfMemoryError` when the response is large but no enough memory is available. However, when this happens, `TransportClient.sendRpcSync` will just hang forever if the timeout set to unlimited.
This PR catches `Throwable` and uses the error to complete `SettableFuture`.
## How was this patch tested?
I tested in my IDE by setting the value of size to -1 to verify the result. Without this patch, it won't be finished until timeout (May hang forever if timeout set to MAX_INT), or the expected `IllegalArgumentException` will be caught.
```java
Override
public void onSuccess(ByteBuffer response) {
try {
int size = response.remaining();
ByteBuffer copy = ByteBuffer.allocate(size); // set size to -1 in runtime when debug
copy.put(response);
// flip "copy" to make it readable
copy.flip();
result.set(copy);
} catch (Throwable t) {
result.setException(t);
}
}
```
Closes#24964 from LantaoJin/SPARK-28160.
Lead-authored-by: LantaoJin <jinlantao@gmail.com>
Co-authored-by: lajin <lajin@ebay.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
See https://github.com/apache/spark/pull/24702/files#r296765487 -- this just fixes a Java style error. I'm not clear why the PR build didn't catch it.
## How was this patch tested?
N/A
Closes#24951 from srowen/SPARK-27989.2.
Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
Disabled negative dns caching for docker images
Improved logging on DNS resolution, convenient for slow k8s clusters
## What changes were proposed in this pull request?
Added retries when building the connection to the driver in K8s.
In some scenarios DNS reslution can take more than the timeout.
Also openjdk-8 by default has negative dns caching enabled, which means even retries may not help depending on the times.
## How was this patch tested?
This patch was tested agains an specific k8s cluster with slow response time in DNS to ensure it woks.
Closes#24702 from jlpedrosa/feature/kuberetries.
Authored-by: Jose Luis Pedrosa <jlpedrosa@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
First, there is currently no public documentation for this setting. So it's hard
to even know that it could be a problem if your application starts failing with
weird shuffle errors.
Second, the javadoc attached to the code was incorrect; the default value just uses
the default value from the JRE, which is 50, instead of having an unbounded queue
as the comment implies.
So use a default that is a "rounded" version of the JRE default, and provide
documentation explaining that this value may need to be adjusted. Also added
a log message that was very helpful in debugging an issue caused by this
problem.
Closes#24732 from vanzin/SPARK-27868.
Authored-by: Marcelo Vanzin <vanzin@cloudera.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
`ExecutorClassLoader`'s `findClass` may fail to fetch a class due to transient exceptions. For example, when a task is interrupted, if `ExecutorClassLoader` is fetching a class, you may see `InterruptedException` or `IOException` wrapped by `ClassNotFoundException`, even if this class can be loaded. Then the result of `findClass` will be cached by JVM, and later when the same class is being loaded in the same executor, it will just throw NoClassDefFoundError even if the class can be loaded.
I found JVM only caches `LinkageError` and `ClassNotFoundException`. Hence in this PR, I changed ExecutorClassLoader to throw `RemoteClassLoadedError` if we cannot get a response from driver.
## How was this patch tested?
New unit tests.
Closes#24683 from zsxwing/SPARK-20547-fix.
Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
## What changes were proposed in this pull request?
As the current approach in OneForOneBlockFetcher, we reuse the OpenBlocks protocol to describe the fetch request for shuffle blocks, and it causes the extension work for shuffle fetching like #19788 and #24110 very awkward.
In this PR, we split the fetch request for shuffle blocks from OpenBlocks which named FetchShuffleBlocks. It's a loose bind with ShuffleBlockId and can easily extend by adding new fields in this protocol.
## How was this patch tested?
Existing and new added UT.
Closes#24565 from xuanyuanking/SPARK-27665.
Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
# What changes were proposed in this pull request?
## Problem statement
An executor which has persisted blocks does not consider to be idle and this way ready to be released by dynamic allocation after the regular timeout `spark.dynamicAllocation.executorIdleTimeout` but there is separate configuration `spark.dynamicAllocation.cachedExecutorIdleTimeout` which defaults to `Integer.MAX_VALUE`. This is because releasing the executor also means losing the persisted blocks (as the metadata for individual blocks called `BlockInfo` are kept in memory) and when the RDD is referenced latter on this lost blocks will be recomputed.
On the other hand keeping the executors too long without any task to work on is also a waste of resources (as executors are reserved for the application by the resource manager).
## Solution
This PR focuses on the first part of SPARK-25888: it extends the external shuffle service with the capability to serve RDD blocks which are persisted on the local disk store by the executors. Moreover when this feature is enabled by setting the `spark.shuffle.service.fetch.rdd.enabled` config to true and a block is reported to be persisted on to disk the external shuffle service instance running on the same host as the executor is also registered (along with the reporting block manager) as a possible location for fetching it.
## Some implementation detail
Some explanation about the decisions made during the development:
- the location list to fetch a block was randomized but the groups (same host, same rack, others) order was kept. In this PR the order of groups are kept and external shuffle service added to the end of the each group.
- `BlockManagerInfo` is not introduced for external shuffle service but only a lightweight solution is taken. A hash map from `BlockId` to `BlockStatus` is introduced. A type alias would make the source more readable but I know it is discouraged. On the other hand a new class wrapping this hash map would introduce unnecessary indirection.
- when this feature is on the cleanup triggered during removing of executors (which is handled in `ExternalShuffleBlockResolver`) is modified to keep the disk persisted RDD blocks. This cleanup is triggered in standalone mode when the `spark.storage.cleanupFilesAfterExecutorExit` config is set.
- the unpersisting of an RDD is extended to use the external shuffle service for disk persisted RDD blocks when the original executor which created the blocks are already released. New block transport messages are introduced to support this: `RemoveBlocks` and `BlocksRemoved`.
# How was this patch tested?
## Unit tests
### ExternalShuffleServiceSuite
Here the complete use case is tested by the "SPARK-25888: using external shuffle service fetching disk persisted blocks" with a tiny difference: here the executor is killed manually, this way the test is a bit faster than waiting for the idle timeout.
### ExternalShuffleBlockHandlerSuite
Tests the fetching of the RDD blocks via the external shuffle service.
### BlockManagerInfoSuite
This a new suite. As the `BlockManagerInfo` behaviour depends very much on whether the external shuffle service enabled or not all the tests are executed with and without it.
### BlockManagerSuite
Tests the sorting of the block locations.
## Manually on YARN
Spark App was:
~~~scala
package com.mycompany
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.storage.StorageLevel
object TestAppDiskOnlyLevel {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("test-app")
println("Attila: START")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(0 until 100, 10)
.map { i =>
println(s"Attila: calculate first rdd i=$i")
Thread.sleep(1000)
i
}
rdd.persist(StorageLevel.DISK_ONLY)
rdd.count()
println("Attila: First RDD is processed, waiting for 60 sec")
Thread.sleep(60 * 1000)
println("Attila: Num executors must be 0 as executorIdleTimeout is way over")
val rdd2 = sc.parallelize(0 until 10, 1)
.map(i => (i, 1))
.persist(StorageLevel.DISK_ONLY)
rdd2.count()
println("Attila: Second RDD with one partition (only one executors must be alive)")
// reduce runs as user code to detect the empty seq (empty blocks)
println("Calling collect on the first RDD: " + rdd.collect().reduce(_ + _))
println("Attila: STOP")
}
}
~~~
I have submitted with the following configuration:
~~~bash
spark-submit --master yarn \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.executorIdleTimeout=30 \
--conf spark.dynamicAllocation.cachedExecutorIdleTimeout=90 \
--class com.mycompany.TestAppDiskOnlyLevel dyn_alloc_demo-core_2.11-0.1.0-SNAPSHOT-jar-with-dependencies.jar
~~~
Checked the result by filtering for the side effect of the task calculations:
~~~bash
[userserver ~]$ yarn logs -applicationId application_1556299359453_0001 | grep "Attila: calculate" | wc -l
WARNING: YARN_OPTS has been replaced by HADOOP_OPTS. Using value of YARN_OPTS.
19/04/26 10:31:59 INFO client.RMProxy: Connecting to ResourceManager at apiros-1.gce.company.com/172.31.115.165:8032
100
~~~
So it is only 100 task execution and not 200 (which would be the case for re-computation).
Moreover from the submit/launcher log we can see executors really stopped in between (see the new total is 0 before the last line):
~~~
[userserver ~]$ grep "Attila: Num executors must be 0" -B 2 spark-submit.log
19/04/26 10:24:27 INFO cluster.YarnScheduler: Executor 9 on apiros-3.gce.company.com killed by driver.
19/04/26 10:24:27 INFO spark.ExecutorAllocationManager: Existing executor 9 has been removed (new total is 0)
Attila: Num executors must be 0 as executorIdleTimeout is way over
~~~
[Full spark submit log](https://github.com/attilapiros/spark/files/3122465/spark-submit.log)
I have done a test also after changing the `DISK_ONLY` storage level to `MEMORY_ONLY` for the first RDD. After this change during the 60sec waiting no executor was removed.
Closes#24499 from attilapiros/SPARK-25888-final.
Authored-by: “attilapiros” <piros.attila.zsolt@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?
Right now there are several issues in `EncryptedMessage.transferTo`:
- When the underlying buffer has more than `1024 * 32` bytes (this should be rare but it could happen in error messages that send over the wire), it may just send a partial message as `EncryptedMessage.count` becomes less than `transferred`. This will cause the client hang forever (or timeout) as it will wait until receiving expected length of bytes, or weird errors (such as corruption or silent correctness issue) if the channel is reused by other messages.
- When the underlying buffer is full, it's still trying to write out bytes in a busy loop.
This PR fixes the issues in `EncryptedMessage.transferTo` and also makes it follow the contract of `FileRegion`:
- `count` should be a fixed value which is just the length of the whole message.
- It should be non-blocking. When the underlying socket is not ready to write, it should give up and give control back.
- `transferTo` should return the length of written bytes.
## How was this patch tested?
The new added tests.
Closes#24211 from zsxwing/fix-enc.
Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
When a timeout happens we don't know what's the state of the remote end,
so there is no point in doing anything else since it will most probably
fail anyway.
The change also demotes the log message printed when falling back to
SASL, since a warning is too noisy for when the fallback is really
needed (e.g. old shuffle service, or shuffle service with new auth
disabled).
Closes#24160 from vanzin/SPARK-27219.
Authored-by: Marcelo Vanzin <vanzin@cloudera.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
When TransportChannelHandler processes IdleStateEvent, it first calculates whether the last request time has timed out.
At this time, TransportClient.sendRpc initiates a request.
TransportChannelHandler gets responseHandler.numOutstandingRequests() > 0, causing the normal connection to be closed.
## How was this patch tested?
Closes#23989 from cxzl25/fix_IdleStateEvent_timeout.
Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?
Creating an Netty `EventLoopGroup` leads to creating a new Thread pool for handling the events. For stopping the threads of the pool the event loop group should be shut down which is properly done for transport servers and clients by calling for example the `shutdownGracefully()` method (for details see the `close()` method of `TransportClientFactory` and `TransportServer`). But there is a separate event loop group for shuffle chunk fetch requests which is in pipeline for handling fetch request (shared between the client and server) and owned by the `TransportContext` and this was never shut down.
## How was this patch tested?
With existing unittest.
This leak is in the production system too but its effect is spiking in the unittest.
Checking the core unittest logs before the PR:
```
$ grep "LEAK IN SUITE" unit-tests.log | grep -o shuffle-chunk-fetch-handler | wc -l
381
```
And after the PR without whitelisting in thread audit and with an extra `await` after the
` chunkFetchWorkers.shutdownGracefully()`:
```
$ grep "LEAK IN SUITE" unit-tests.log | grep -o shuffle-chunk-fetch-handler | wc -l
0
```
Closes#23930 from attilapiros/SPARK-27021.
Authored-by: “attilapiros” <piros.attila.zsolt@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?
Currently, TransportFrameDecoder will not consolidate the buffers read from network which may cause memory waste. Actually, bytebuf's writtenIndex is far less than it's capacity in most cases, so we can optimize it by doing consolidation.
This PR will do this optimization.
Related codes:
9a30e23211/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java (L143)
## How was this patch tested?
UT
Please review http://spark.apache.org/contributing.html before opening a pull request.
Closes#23602 from liupc/Reduce-memory-consumption-in-TransportFrameDecoder.
Lead-authored-by: liupengcheng <liupengcheng@xiaomi.com>
Co-authored-by: Liupengcheng <liupengcheng@xiaomi.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?
How to reproduce
./build/mvn test -Dtest=org.apache.spark.network.RequestTimeoutIntegrationSuite,org.apache.spark.network.ChunkFetchIntegrationSuite -DwildcardSuites=None test
furtherRequestsDelay Test within RequestTimeoutIntegrationSuite was holding onto buffer references within worker threads. The test does close the server context but since the threads are global and there is sleep of 60 secs to fetch a specific chunk within this test, it grabs on it and waits for the client to consume but however the test is testing for a request timeout and it times out after 10 secs, so the workers are just waiting there for the buffer to be consumed by client as per my understanding.
This tends to happen if you dont have enough IO threads available on the specific system and also the order of the tests being run determines its flakyness like if ChunkFetchIntegrationSuite runs first then there is no issue. For example on mac with 8 threads these tests run fine but on my vm with 4 threads it fails. It matches the number of fetch calls in RequestTimeoutIntegrationSuite.
So do we really need it to be static?
I dont think this requires a global declaration as these threads are only required on the shuffle server end and on the client TransportContext initialization i.e the Client don't initialize these threads. The Shuffle Server initializes one TransportContext object. So, I think this is fine to be an instance variable and I see no harm.
## How was this patch tested?
Integration tests, manual tests
Closes#23700 from redsanket/SPARK-25692.
Authored-by: schintap <schintap@oath.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
In MessageEncoder.java, the header would always be allocated on onheap memory regardless of whether netty was configured to use/prefer onheap or offheap. By default this made netty allocate 16mb of onheap memory for a tiny header message. It would be more practical to use preallocated buffers.
Using a memory monitor tool on a simple spark application, the following services currently allocate 16 mb of onheap memory:
netty-rpc-client
netty-blockTransfer-client
netty-external-shuffle-client
With this change, the memory monitor tool reports all three of these services as using 0 b of onheap memory. The offheap memory allocation does not increase, but more of the already-allocated space is used.
## How was this patch tested?
Manually tested change using spark-memory-tool https://github.com/squito/spark-memoryCloses#22114 from NiharS/nettybuffer.
Lead-authored-by: Nihar Sheth <niharrsheth@gmail.com>
Co-authored-by: Nihar Sheth <nsheth@cloudera.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
Now in `TransportRequestHandler.processStreamRequest`, when a stream request is processed, the stream id is not registered with the current channel in stream manager. It should do that so in case of that the channel gets terminated we can remove associated streams of stream requests too.
This also cleans up channel registration in `StreamManager`. Since `StreamManager` doesn't register channel but only `OneForOneStreamManager` does it, this removes `registerChannel` from `StreamManager`. When `OneForOneStreamManager` goes to register stream, it will also register channel for the stream.
## How was this patch tested?
Existing tests.
Closes#23521 from viirya/SPARK-26604.
Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
Make it possible for the master to enable TCP keep alive on the RPC connections with clients.
## How was this patch tested?
Manually tested.
Added the following:
```
spark.rpc.io.enableTcpKeepAlive true
```
to spark-defaults.conf.
Observed the following on the Spark master:
```
$ netstat -town | grep 7077
tcp6 0 0 10.240.3.134:7077 10.240.1.25:42851 ESTABLISHED keepalive (6736.50/0/0)
tcp6 0 0 10.240.3.134:44911 10.240.3.134:7077 ESTABLISHED keepalive (4098.68/0/0)
tcp6 0 0 10.240.3.134:7077 10.240.3.134:44911 ESTABLISHED keepalive (4098.68/0/0)
```
Which proves that the keep alive setting is taking effect.
It's currently possible to enable TCP keep alive on the worker / executor, but is not possible to configure on other RPC connections. It's unclear to me why this could be the case. Keep alive is more important for the master to protect it against suddenly departing workers / executors, thus I think it's very important to have it. Particularly this makes the master resilient in case of using preemptible worker VMs in GCE. GCE has the concept of shutdown scripts, which it doesn't guarantee to execute. So workers often don't get shutdown gracefully and the TCP connections on the master linger as there's nothing to close them. Thus the need of enabling keep alive.
This enables keep-alive on connections besides the master's connections, but that shouldn't cause harm.
Closes#20512 from peshopetrov/master.
Authored-by: Petar Petrov <petar.petrov@leanplum.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
Introducing shared polled ByteBuf allocators.
This feature can be enabled via the "spark.network.sharedByteBufAllocators.enabled" configuration.
When it is on then only two pooled ByteBuf allocators are created:
- one for transport servers where caching is allowed and
- one for transport clients where caching is disabled
This way the cache allowance remains as before.
Both shareable pools are created with numCores parameter set to 0 (which defaults to the available processors) as conf.serverThreads() and conf.clientThreads() are module dependant and the lazy creation of this allocators would lead to unpredicted behaviour.
When "spark.network.sharedByteBufAllocators.enabled" is false then a new allocator is created for every transport client and server separately as was before this PR.
## How was this patch tested?
Existing unit tests.
Closes#23278 from attilapiros/SPARK-24920.
Authored-by: “attilapiros” <piros.attila.zsolt@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
This PR upgrades Mockito from 1.10.19 to 2.23.4. The following changes are required.
- Replace `org.mockito.Matchers` with `org.mockito.ArgumentMatchers`
- Replace `anyObject` with `any`
- Replace `getArgumentAt` with `getArgument` and add type annotation.
- Use `isNull` matcher in case of `null` is invoked.
```scala
saslHandler.channelInactive(null);
- verify(handler).channelInactive(any(TransportClient.class));
+ verify(handler).channelInactive(isNull());
```
- Make and use `doReturn` wrapper to avoid [SI-4775](https://issues.scala-lang.org/browse/SI-4775)
```scala
private def doReturn(value: Any) = org.mockito.Mockito.doReturn(value, Seq.empty: _*)
```
## How was this patch tested?
Pass the Jenkins with the existing tests.
Closes#23452 from dongjoon-hyun/SPARK-26536.
Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
## What changes were proposed in this pull request?
`fireChannelRead` expects `io.netty.buffer.ByteBuf`.I checked that this is the only place which misuse `java.nio.ByteBuffer` in `network` module.
## How was this patch tested?
Pass the Jenkins with the existing tests.
Closes#23442 from dongjoon-hyun/SPARK-NETWORK-COMMON.
Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
## What changes were proposed in this pull request?
This PR addresses warning messages in Java files reported at [lgtm.com](https://lgtm.com).
[lgtm.com](https://lgtm.com) provides automated code review of Java/Python/JavaScript files for OSS projects. [Here](https://lgtm.com/projects/g/apache/spark/alerts/?mode=list&severity=warning) are warning messages regarding Apache Spark project.
This PR addresses the following warnings:
- Result of multiplication cast to wider type
- Implicit narrowing conversion in compound assignment
- Boxed variable is never null
- Useless null check
NOTE: `Potential input resource leak` looks false positive for now.
## How was this patch tested?
Existing UTs
Closes#23420 from kiszk/SPARK-26508.
Authored-by: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
Recently, the ability to expose the metrics for YARN Shuffle Service was added as part of [SPARK-18364](https://github.com/apache/spark/pull/22485). We need to add some metrics to be able to determine the number of active connections as well as open connections to the external shuffle service to benchmark network and connection issues on large cluster environments.
Added two more shuffle server metrics for Spark Yarn shuffle service: numRegisteredConnections which indicate the number of registered connections to the shuffle service and numActiveConnections which indicate the number of active connections to the shuffle service at any given point in time.
If these metrics are outputted to a file, we get something like this:
1533674653489 default.shuffleService: Hostname=server1.abc.com, openBlockRequestLatencyMillis_count=729, openBlockRequestLatencyMillis_rate15=0.7110833548897356, openBlockRequestLatencyMillis_rate5=1.657808981793011, openBlockRequestLatencyMillis_rate1=2.2404486061620474, openBlockRequestLatencyMillis_rateMean=0.9242558551196706,
numRegisteredConnections=35,
blockTransferRateBytes_count=2635880512, blockTransferRateBytes_rate15=2578547.6094160094, blockTransferRateBytes_rate5=6048721.726302424, blockTransferRateBytes_rate1=8548922.518223226, blockTransferRateBytes_rateMean=3341878.633637769, registeredExecutorsSize=5, registerExecutorRequestLatencyMillis_count=5, registerExecutorRequestLatencyMillis_rate15=0.0027973949328659836, registerExecutorRequestLatencyMillis_rate5=0.0021278007987206426, registerExecutorRequestLatencyMillis_rate1=2.8270296777387467E-6, registerExecutorRequestLatencyMillis_rateMean=0.006339206380043053, numActiveConnections=35
Closes#22498 from pgandhi999/SPARK-18364.
Authored-by: pgandhi <pgandhi@oath.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?
`org.apache.spark.network.RpcIntegrationSuite.sendRpcWithStreamFailures` is still flaky and here is error message:
```
sbt.ForkMain$ForkError: java.lang.AssertionError: Got a non-empty set [Failed to send RPC RPC 8249697863992194475 to /172.17.0.2:41177: java.io.IOException: Broken pipe]
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.assertTrue(Assert.java:41)
at org.apache.spark.network.RpcIntegrationSuite.assertErrorAndClosed(RpcIntegrationSuite.java:389)
at org.apache.spark.network.RpcIntegrationSuite.sendRpcWithStreamFailures(RpcIntegrationSuite.java:347)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runners.Suite.runChild(Suite.java:128)
at org.junit.runners.Suite.runChild(Suite.java:27)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
at com.novocode.junit.JUnitRunner$1.execute(JUnitRunner.java:132)
at sbt.ForkMain$Run$2.call(ForkMain.java:296)
at sbt.ForkMain$Run$2.call(ForkMain.java:286)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```
This happened when the second RPC message was being sent but the connection was closed at the same time.
## How was this patch tested?
Jenkins
Closes#23109 from zsxwing/SPARK-26069-2.
Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
This restores scaladoc artifact generation, which got dropped with the Scala 2.12 update. The change looks large, but is almost all due to needing to make the InterfaceStability annotations top-level classes (i.e. `InterfaceStability.Stable` -> `Stable`), unfortunately. A few inner class references had to be qualified too.
Lots of scaladoc warnings now reappear. We can choose to disable generation by default and enable for releases, later.
## How was this patch tested?
N/A; build runs scaladoc now.
Closes#23069 from srowen/SPARK-26026.
Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>