Apache Spark - A unified analytics engine for large-scale data processing
Go to file
Andrew Or 0eb4a7fb0f [SPARK-4480] Avoid many small spills in external data structures
**Summary.** Currently, we may spill many small files in `ExternalAppendOnlyMap` and `ExternalSorter`. The underlying root cause of this is summarized in [SPARK-4452](https://issues.apache.org/jira/browse/SPARK-4452). This PR does not address this root cause, but simply provides the guarantee that we never spill the in-memory data structure if its size is less than a configurable threshold of 5MB. This config is not documented because we don't want users to set it themselves, and it is not hard-coded because we need to change it in tests.

**Symptom.** Each spill is orders of magnitude smaller than 1MB, and there are many spills. In environments where the ulimit is set, this frequently causes "too many open file" exceptions observed in [SPARK-3633](https://issues.apache.org/jira/browse/SPARK-3633).
```
14/11/13 19:20:43 INFO collection.ExternalSorter: Thread 60 spilling in-memory batch of 4792 B to disk (292769 spills so far)
14/11/13 19:20:43 INFO collection.ExternalSorter: Thread 60 spilling in-memory batch of 4760 B to disk (292770 spills so far)
14/11/13 19:20:43 INFO collection.ExternalSorter: Thread 60 spilling in-memory batch of 4520 B to disk (292771 spills so far)
14/11/13 19:20:43 INFO collection.ExternalSorter: Thread 60 spilling in-memory batch of 4560 B to disk (292772 spills so far)
14/11/13 19:20:43 INFO collection.ExternalSorter: Thread 60 spilling in-memory batch of 4792 B to disk (292773 spills so far)
14/11/13 19:20:43 INFO collection.ExternalSorter: Thread 60 spilling in-memory batch of 4784 B to disk (292774 spills so far)
```

**Reproduction.** I ran the following on a small 4-node cluster with 512MB executors. Note that the back-to-back shuffle here is necessary for reasons described in [SPARK-4522](https://issues.apache.org/jira/browse/SPARK-4452). The second shuffle is a `reduceByKey` because it performs a map-side combine.
```
sc.parallelize(1 to 100000000, 100)
  .map { i => (i, i) }
  .groupByKey()
  .reduceByKey(_ ++ _)
  .count()
```
Before the change, I notice that each thread may spill up to 1000 times, and the size of each spill is on the order of 10KB. After the change, each thread spills only up to 20 times in the worst case, and the size of each spill is on the order of 1MB.

Author: Andrew Or <andrew@databricks.com>

Closes #3353 from andrewor14/avoid-small-spills and squashes the following commits:

49f380f [Andrew Or] Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/spark into avoid-small-spills
27d6966 [Andrew Or] Merge branch 'master' of github.com:apache/spark into avoid-small-spills
f4736e3 [Andrew Or] Fix tests
a919776 [Andrew Or] Avoid many small spills
2014-11-19 18:07:27 -08:00
assembly Bumping version to 1.3.0-SNAPSHOT. 2014-11-18 21:24:18 -08:00
bagel Bumping version to 1.3.0-SNAPSHOT. 2014-11-18 21:24:18 -08:00
bin [SPARK-4017] show progress bar in console 2014-11-18 13:37:21 -08:00
conf SPARK-3663 Document SPARK_LOG_DIR and SPARK_PID_DIR 2014-11-14 13:33:35 -08:00
core [SPARK-4480] Avoid many small spills in external data structures 2014-11-19 18:07:27 -08:00
data/mllib SPARK-2363. Clean MLlib's sample data files 2014-07-13 19:27:43 -07:00
dev SPARK-4466: Provide support for publishing Scala 2.11 artifacts to Maven 2014-11-17 21:07:50 -08:00
docker [SPARK-1342] Scala 2.10.4 2014-04-01 18:35:50 -07:00
docs Updating GraphX programming guide and documentation 2014-11-19 16:53:33 -08:00
ec2 [SPARK-4137] [EC2] Don't change working dir on user 2014-11-05 20:45:35 -08:00
examples SPARK-4455 Exclude dependency on hbase-annotations module 2014-11-19 00:55:39 -08:00
external SPARK-3962 Marked scope as provided for external projects. 2014-11-19 14:18:10 -08:00
extras Bumping version to 1.3.0-SNAPSHOT. 2014-11-18 21:24:18 -08:00
graphx Updating GraphX programming guide and documentation 2014-11-19 16:53:33 -08:00
mllib Bumping version to 1.3.0-SNAPSHOT. 2014-11-18 21:24:18 -08:00
network Bumping version to 1.3.0-SNAPSHOT. 2014-11-18 21:24:18 -08:00
project Updating GraphX programming guide and documentation 2014-11-19 16:53:33 -08:00
python [SPARK-4384] [PySpark] improve sort spilling 2014-11-19 15:45:37 -08:00
repl Bumping version to 1.3.0-SNAPSHOT. 2014-11-18 21:24:18 -08:00
sbin [SPARK-4110] Wrong comments about default settings in spark-daemon.sh 2014-10-28 12:29:01 -07:00
sbt [SPARK-4312] bash doesn't have "die" 2014-11-10 12:37:56 -08:00
sql Bumping version to 1.3.0-SNAPSHOT. 2014-11-18 21:24:18 -08:00
streaming [SPARK-4294][Streaming] UnionDStream stream should express the requirements in the same way as TransformedDStream 2014-11-19 15:53:06 -08:00
tools Bumping version to 1.3.0-SNAPSHOT. 2014-11-18 21:24:18 -08:00
yarn Bumping version to 1.3.0-SNAPSHOT. 2014-11-18 21:24:18 -08:00
.gitattributes [SPARK-3870] EOL character enforcement 2014-10-31 12:39:52 -07:00
.gitignore [SPARK-3584] sbin/slaves doesn't work when we use password authentication for SSH 2014-09-25 16:49:15 -07:00
.rat-excludes Support cross building for Scala 2.11 2014-11-11 21:36:48 -08:00
CONTRIBUTING.md [Docs] minor grammar fix 2014-09-17 12:33:09 -07:00
LICENSE [SPARK-4242] [Core] Add SASL to external shuffle service 2014-11-05 14:38:43 -08:00
make-distribution.sh [HOT FIX] make-distribution.sh fails if Yarn shuffle jar DNE 2014-11-13 11:54:45 -08:00
NOTICE SPARK-1827. LICENSE and NOTICE files need a refresh to contain transitive dependency info 2014-05-14 09:38:33 -07:00
pom.xml Bumping version to 1.3.0-SNAPSHOT. 2014-11-18 21:24:18 -08:00
README.md SPARK-971 [DOCS] Link to Confluence wiki from project website / documentation 2014-11-09 17:40:48 -08:00
scalastyle-config.xml [Core] Upgrading ScalaStyle version to 0.5 and removing SparkSpaceAfterCommentStartChecker. 2014-10-16 02:05:44 -04:00
tox.ini [SPARK-3073] [PySpark] use external sort in sortBy() and sortByKey() 2014-08-26 16:57:40 -07:00

Apache Spark

Spark is a fast and general cluster computing system for Big Data. It provides high-level APIs in Scala, Java, and Python, and an optimized engine that supports general computation graphs for data analysis. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming for stream processing.

http://spark.apache.org/

Online Documentation

You can find the latest Spark documentation, including a programming guide, on the project web page and project wiki. This README file only contains basic setup instructions.

Building Spark

Spark is built using Apache Maven. To build Spark and its example programs, run:

mvn -DskipTests clean package

(You do not need to do this if you downloaded a pre-built package.) More detailed documentation is available from the project site, at "Building Spark with Maven".

Interactive Scala Shell

The easiest way to start using Spark is through the Scala shell:

./bin/spark-shell

Try the following command, which should return 1000:

scala> sc.parallelize(1 to 1000).count()

Interactive Python Shell

Alternatively, if you prefer Python, you can use the Python shell:

./bin/pyspark

And run the following command, which should also return 1000:

>>> sc.parallelize(range(1000)).count()

Example Programs

Spark also comes with several sample programs in the examples directory. To run one of them, use ./bin/run-example <class> [params]. For example:

./bin/run-example SparkPi

will run the Pi example locally.

You can set the MASTER environment variable when running examples to submit examples to a cluster. This can be a mesos:// or spark:// URL, "yarn-cluster" or "yarn-client" to run on YARN, and "local" to run locally with one thread, or "local[N]" to run locally with N threads. You can also use an abbreviated class name if the class is in the examples package. For instance:

MASTER=spark://host:7077 ./bin/run-example SparkPi

Many of the example programs print usage help if no params are given.

Running Tests

Testing first requires building Spark. Once Spark is built, tests can be run using:

./dev/run-tests

Please see the guidance on how to run all automated tests.

A Note About Hadoop Versions

Spark uses the Hadoop core library to talk to HDFS and other Hadoop-supported storage systems. Because the protocols have changed in different versions of Hadoop, you must build Spark against the same version that your cluster runs.

Please refer to the build documentation at "Specifying the Hadoop Version" for detailed guidance on building for a particular distribution of Hadoop, including building for particular Hive and Hive Thriftserver distributions. See also "Third Party Hadoop Distributions" for guidance on building a Spark application that works with a particular distribution.

Configuration

Please refer to the Configuration guide in the online documentation for an overview on how to configure Spark.