8b77b31835
### What changes were proposed in this pull request? Remove the requirement to launch a task in order to reset locality wait timer. ### Why are the changes needed? Recently https://github.com/apache/spark/pull/27207 was merged, but contained a bug which leads to undesirable behavior. The crux of the issue is that single resource offers couldn't reset the timer, if there had been a previous reject followed by an allResourceOffer with no available resources. This lead to a problem where once locality level reached ANY, single resource offers are all accepted, leading allResourceOffers to be left with no resources to utilize (hence no task being launched on an all resource offer -> no timer reset). The task manager would be stuck in ANY locality level. Noting down here the downsides of using below reset conditions, in case we want to follow up. As this is quite complex, I could easily be missing something, so please comment/respond if you have more bad behavior scenarios or find something wrong here: The format is: > **Reset condition** > - the unwanted side effect > - the cause/use case Below references to locality increase/decrease mean: ``` PROCESS_LOCAL, NODE_LOCAL ... .. ANY ------ locality decrease ---> <----- locality increase ----- ``` **Task launch:** - locality decrease: - Blacklisting, FAIR/FIFO scheduling, or task resource requirements can minimize tasks launched - locality increase: - single task launch decreases locality despite many tasks remaining **No delay schedule reject since last allFreeResource offer** - locality decrease: - locality wait less than allFreeResource offer frequency, which occurs at least 1 per second - locality increase: - single resource (or none) not rejected despite many tasks remaining (other lower priority tasks utilizing resources) **Current impl - No delay schedule reject since last (allFreeResource offer + task launch)** - locality decrease: - all from above - locality increase: - single resource accepted and task launched despite many tasks remaining The current impl is an improvement on the legacy (task launch) in that unintended locality decrease case is similar and the unintended locality increase case only occurs when the cluster is fully utilized. For the locality increase cases, perhaps a config which specifies a certain % of tasks in a taskset to finish before resetting locality levels would be helpful. **If** that was considered a good approach then perhaps removing the task launch as a requirement would eliminate most of downsides listed above. Lemme know if you have more ideas for eliminating locality increase downside of **No delay schedule reject since last allFreeResource offer** ### Does this PR introduce any user-facing change? No ### How was this patch tested? TaskSchedulerImplSuite Also manually tested similar to how I tested in https://github.com/apache/spark/pull/27207 using [this simple app](https://github.com/bmarcott/spark-test-apps/blob/master/src/main/scala/TestLocalityWait.scala). With the new changes, given locality wait of 10s the behavior is generally: 10 seconds of locality being respected, followed by a single full utilization of resources using ANY locality level, followed by 10 seconds of locality being respected, and so on If the legacy flag is enabled (spark.locality.wait.legacyResetOnTaskLaunch=true), the behavior is only scheduling PROCESS_LOCAL tasks (only utilizing a single executor) cloud-fan tgravescs Closes #28188 from bmarcott/nmarcott-locality-fix. Authored-by: Nicholas Marcott <481161+bmarcott@users.noreply.github.com> Signed-off-by: Thomas Graves <tgraves@apache.org> |
||
---|---|---|
.github | ||
assembly | ||
bin | ||
build | ||
common | ||
conf | ||
core | ||
data | ||
dev | ||
docs | ||
examples | ||
external | ||
graphx | ||
hadoop-cloud | ||
launcher | ||
licenses | ||
licenses-binary | ||
mllib | ||
mllib-local | ||
project | ||
python | ||
R | ||
repl | ||
resource-managers | ||
sbin | ||
sql | ||
streaming | ||
tools | ||
.asf.yaml | ||
.gitattributes | ||
.gitignore | ||
appveyor.yml | ||
CONTRIBUTING.md | ||
LICENSE | ||
LICENSE-binary | ||
NOTICE | ||
NOTICE-binary | ||
pom.xml | ||
README.md | ||
scalastyle-config.xml |
Apache Spark
Spark is a unified analytics engine for large-scale data processing. It provides high-level APIs in Scala, Java, Python, and R, and an optimized engine that supports general computation graphs for data analysis. It also supports a rich set of higher-level tools including Spark SQL for SQL and DataFrames, MLlib for machine learning, GraphX for graph processing, and Structured Streaming for stream processing.
Online Documentation
You can find the latest Spark documentation, including a programming guide, on the project web page. This README file only contains basic setup instructions.
Building Spark
Spark is built using Apache Maven. To build Spark and its example programs, run:
./build/mvn -DskipTests clean package
(You do not need to do this if you downloaded a pre-built package.)
More detailed documentation is available from the project site, at "Building Spark".
For general development tips, including info on developing Spark using an IDE, see "Useful Developer Tools".
Interactive Scala Shell
The easiest way to start using Spark is through the Scala shell:
./bin/spark-shell
Try the following command, which should return 1,000,000,000:
scala> spark.range(1000 * 1000 * 1000).count()
Interactive Python Shell
Alternatively, if you prefer Python, you can use the Python shell:
./bin/pyspark
And run the following command, which should also return 1,000,000,000:
>>> spark.range(1000 * 1000 * 1000).count()
Example Programs
Spark also comes with several sample programs in the examples
directory.
To run one of them, use ./bin/run-example <class> [params]
. For example:
./bin/run-example SparkPi
will run the Pi example locally.
You can set the MASTER environment variable when running examples to submit
examples to a cluster. This can be a mesos:// or spark:// URL,
"yarn" to run on YARN, and "local" to run
locally with one thread, or "local[N]" to run locally with N threads. You
can also use an abbreviated class name if the class is in the examples
package. For instance:
MASTER=spark://host:7077 ./bin/run-example SparkPi
Many of the example programs print usage help if no params are given.
Running Tests
Testing first requires building Spark. Once Spark is built, tests can be run using:
./dev/run-tests
Please see the guidance on how to run tests for a module, or individual tests.
There is also a Kubernetes integration test, see resource-managers/kubernetes/integration-tests/README.md
A Note About Hadoop Versions
Spark uses the Hadoop core library to talk to HDFS and other Hadoop-supported storage systems. Because the protocols have changed in different versions of Hadoop, you must build Spark against the same version that your cluster runs.
Please refer to the build documentation at "Specifying the Hadoop Version and Enabling YARN" for detailed guidance on building for a particular distribution of Hadoop, including building for particular Hive and Hive Thriftserver distributions.
Configuration
Please refer to the Configuration Guide in the online documentation for an overview on how to configure Spark.
Contributing
Please review the Contribution to Spark guide for information on how to get started contributing to the project.