Apache Spark - A unified analytics engine for large-scale data processing
Go to file
Josh Rosen 83b7a1c650 [SPARK-4019] [SPARK-3740] Fix MapStatus compression bug that could lead to empty results or Snappy errors
This commit fixes a bug in MapStatus that could cause jobs to wrongly return
empty results if those jobs contained stages with more than 2000 partitions
where most of those partitions were empty.

For jobs with > 2000 partitions, MapStatus uses HighlyCompressedMapStatus,
which only stores the average size of blocks.  If the average block size is
zero, then this will cause all blocks to be reported as empty, causing
BlockFetcherIterator to mistakenly skip them.

For example, this would return an empty result:

    sc.makeRDD(0 until 10, 1000).repartition(2001).collect()

This can also lead to deserialization errors (e.g. Snappy decoding errors)
for jobs with > 2000 partitions where the average block size is non-zero but
there is at least one empty block.  In this case, the BlockFetcher attempts to
fetch empty blocks and fails when trying to deserialize them.

The root problem here is that MapStatus has a (previously undocumented)
correctness property that was violated by HighlyCompressedMapStatus:

    If a block is non-empty, then getSizeForBlock must be non-zero.

I fixed this by modifying HighlyCompressedMapStatus to store the average size
of _non-empty_ blocks and to use a compressed bitmap to track which blocks are
empty.

I also removed a test which was broken as originally written: it attempted
to check that HighlyCompressedMapStatus's size estimation error was < 10%,
but this was broken because HighlyCompressedMapStatus is only used for map
statuses with > 2000 partitions, but the test only created 50.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #2866 from JoshRosen/spark-4019 and squashes the following commits:

fc8b490 [Josh Rosen] Roll back hashset change, which didn't improve performance.
5faa0a4 [Josh Rosen] Incorporate review feedback
c8b8cae [Josh Rosen] Two performance fixes:
3b892dd [Josh Rosen] Address Reynold's review comments
ba2e71c [Josh Rosen] Add missing newline
609407d [Josh Rosen] Use Roaring Bitmap to track non-empty blocks.
c23897a [Josh Rosen] Use sets when comparing collect() results
91276a3 [Josh Rosen] [SPARK-4019] Fix MapStatus compression bug that could lead to empty results.
2014-10-23 16:39:32 -07:00
assembly SPARK-3638 | Forced a compatible version of http client in kinesis-asl profile 2014-10-01 18:31:18 -07:00
bagel [SPARK-3748] Log thread name in unit test logs 2014-10-01 01:03:49 -07:00
bin [SPARK-3943] Some scripts bin\*.cmd pollutes environment variables in Windows 2014-10-14 18:50:14 -07:00
conf [SPARK-3584] sbin/slaves doesn't work when we use password authentication for SSH 2014-09-25 16:49:15 -07:00
core [SPARK-4019] [SPARK-3740] Fix MapStatus compression bug that could lead to empty results or Snappy errors 2014-10-23 16:39:32 -07:00
data/mllib SPARK-2363. Clean MLlib's sample data files 2014-07-13 19:27:43 -07:00
dev [SPARK-3843][Minor] Cleanup scalastyle.txt at the end of running dev/scalastyle 2014-10-08 15:19:19 -07:00
docker [SPARK-1342] Scala 2.10.4 2014-04-01 18:35:50 -07:00
docs [SPARK-4055][MLlib] Inconsistent spelling 'MLlib' and 'MLLib' 2014-10-23 09:19:32 -07:00
ec2 Fetch from branch v4 in Spark EC2 script. 2014-10-08 22:25:15 -07:00
examples [SPARK-4055][MLlib] Inconsistent spelling 'MLlib' and 'MLLib' 2014-10-23 09:19:32 -07:00
external [SPARK-3912][Streaming] Fixed flakyFlumeStreamSuite 2014-10-13 22:46:49 -07:00
extras [SPARK-3748] Log thread name in unit test logs 2014-10-01 01:03:49 -07:00
graphx SPARK-1813. Add a utility to SparkConf that makes using Kryo really easy 2014-10-21 21:53:09 -07:00
mllib [SPARK-4055][MLlib] Inconsistent spelling 'MLlib' and 'MLLib' 2014-10-23 09:19:32 -07:00
project specify unidocGenjavadocVersion of 0.8 2014-10-23 13:46:55 -07:00
python Fix for sampling error in NumPy v1.9 [SPARK-3995][PYSPARK] 2014-10-22 09:33:12 -07:00
repl SPARK-3811 [CORE] More robust / standard Utils.deleteRecursively, Utils.createTempDir 2014-10-09 18:21:59 -07:00
sbin [SPARK-3696]Do not override the user-difined conf_dir 2014-10-03 10:42:41 -07:00
sbt SPARK-3337 Paranoid quoting in shell to allow install dirs with spaces within. 2014-09-08 10:24:15 -07:00
sql [SQL]redundant methods for broadcast 2014-10-21 16:20:58 -07:00
streaming replace awaitTransformation with awaitTermination in scaladoc/javadoc 2014-10-21 09:37:17 -07:00
tools [SPARK-3433][BUILD] Fix for Mima false-positives with @DeveloperAPI and @Experimental annotations. 2014-09-15 21:14:00 -07:00
yarn [SPARK-3877][YARN] Throw an exception when application is not successful so that the exit code wil be set to 1 2014-10-22 15:04:41 -07:00
.gitignore [SPARK-3584] sbin/slaves doesn't work when we use password authentication for SSH 2014-09-25 16:49:15 -07:00
.rat-excludes [SPARK-3584] sbin/slaves doesn't work when we use password authentication for SSH 2014-09-25 16:49:15 -07:00
CONTRIBUTING.md [Docs] minor grammar fix 2014-09-17 12:33:09 -07:00
LICENSE [SPARK-3073] [PySpark] use external sort in sortBy() and sortByKey() 2014-08-26 16:57:40 -07:00
make-distribution.sh Slaves file is now a template. 2014-09-26 22:21:50 -07:00
NOTICE SPARK-1827. LICENSE and NOTICE files need a refresh to contain transitive dependency info 2014-05-14 09:38:33 -07:00
pom.xml [SPARK-4019] [SPARK-3740] Fix MapStatus compression bug that could lead to empty results or Snappy errors 2014-10-23 16:39:32 -07:00
README.md Update Building Spark link. 2014-10-20 19:16:35 -07:00
scalastyle-config.xml [Core] Upgrading ScalaStyle version to 0.5 and removing SparkSpaceAfterCommentStartChecker. 2014-10-16 02:05:44 -04:00
tox.ini [SPARK-3073] [PySpark] use external sort in sortBy() and sortByKey() 2014-08-26 16:57:40 -07:00

Apache Spark

Spark is a fast and general cluster computing system for Big Data. It provides high-level APIs in Scala, Java, and Python, and an optimized engine that supports general computation graphs for data analysis. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming for stream processing.

http://spark.apache.org/

Online Documentation

You can find the latest Spark documentation, including a programming guide, on the project web page. This README file only contains basic setup instructions.

Building Spark

Spark is built using Apache Maven. To build Spark and its example programs, run:

mvn -DskipTests clean package

(You do not need to do this if you downloaded a pre-built package.) More detailed documentation is available from the project site, at "Building Spark with Maven".

Interactive Scala Shell

The easiest way to start using Spark is through the Scala shell:

./bin/spark-shell

Try the following command, which should return 1000:

scala> sc.parallelize(1 to 1000).count()

Interactive Python Shell

Alternatively, if you prefer Python, you can use the Python shell:

./bin/pyspark

And run the following command, which should also return 1000:

>>> sc.parallelize(range(1000)).count()

Example Programs

Spark also comes with several sample programs in the examples directory. To run one of them, use ./bin/run-example <class> [params]. For example:

./bin/run-example SparkPi

will run the Pi example locally.

You can set the MASTER environment variable when running examples to submit examples to a cluster. This can be a mesos:// or spark:// URL, "yarn-cluster" or "yarn-client" to run on YARN, and "local" to run locally with one thread, or "local[N]" to run locally with N threads. You can also use an abbreviated class name if the class is in the examples package. For instance:

MASTER=spark://host:7077 ./bin/run-example SparkPi

Many of the example programs print usage help if no params are given.

Running Tests

Testing first requires building Spark. Once Spark is built, tests can be run using:

./dev/run-tests

Please see the guidance on how to run all automated tests.

A Note About Hadoop Versions

Spark uses the Hadoop core library to talk to HDFS and other Hadoop-supported storage systems. Because the protocols have changed in different versions of Hadoop, you must build Spark against the same version that your cluster runs.

Please refer to the build documentation at "Specifying the Hadoop Version" for detailed guidance on building for a particular distribution of Hadoop, including building for particular Hive and Hive Thriftserver distributions. See also "Third Party Hadoop Distributions" for guidance on building a Spark application that works with a particular distribution.

Configuration

Please refer to the Configuration guide in the online documentation for an overview on how to configure Spark.