Apache Spark - A unified analytics engine for large-scale data processing
Go to file
Josh Rosen 852f4de2d3 [SPARK-7873] Allow KryoSerializerInstance to create multiple streams at the same time
This is a somewhat obscure bug, but I think that it will seriously impact KryoSerializer users who use custom registrators which disabled auto-reset. When auto-reset is disabled, then this breaks things in some of our shuffle paths which actually end up creating multiple OutputStreams from the same shared SerializerInstance (which is unsafe).

This was introduced by a patch (SPARK-3386) which enables serializer re-use in some of the shuffle paths, since constructing new serializer instances is actually pretty costly for KryoSerializer.  We had already fixed another corner-case (SPARK-7766) bug related to this, but missed this one.

I think that the root problem here is that KryoSerializerInstance can be used in a way which is unsafe even within a single thread, e.g. by creating multiple open OutputStreams from the same instance or by interleaving deserialize and deserializeStream calls. I considered a smaller patch which adds assertions to guard against this type of "misuse" but abandoned that approach after I realized how convoluted the Scaladoc became.

This patch fixes this bug by making it legal to create multiple streams from the same KryoSerializerInstance.  Internally, KryoSerializerInstance now implements a  `borrowKryo()` / `releaseKryo()` API that's backed by a "pool" of capacity 1. Each call to a KryoSerializerInstance method will borrow the Kryo, do its work, then release the serializer instance back to the pool. If the pool is empty and we need an instance, it will allocate a new Kryo on-demand. This makes it safe for multiple OutputStreams to be opened from the same serializer. If we try to release a Kryo back to the pool but the pool already contains a Kryo, then we'll just discard the new Kryo. I don't think there's a clear benefit to having a larger pool since our usages tend to fall into two cases, a) where we only create a single OutputStream and b) where we create a huge number of OutputStreams with the same lifecycle, then destroy the KryoSerializerInstance (this is what's happening in the bypassMergeSort code path that my regression test hits).

Author: Josh Rosen <joshrosen@databricks.com>

Closes #6415 from JoshRosen/SPARK-7873 and squashes the following commits:

00b402e [Josh Rosen] Initialize eagerly to fix a failing test
ba55d20 [Josh Rosen] Add explanatory comments
3f1da96 [Josh Rosen] Guard against duplicate close()
ab457ca [Josh Rosen] Sketch a loan/release based solution.
9816e8f [Josh Rosen] Add a failing test showing how deserialize() and deserializeStream() can interfere.
7350886 [Josh Rosen] Add failing regression test for SPARK-7873
2015-05-27 20:19:53 -07:00
assembly [SPARK-6869] [PYSPARK] Add pyspark archives path to PYTHONPATH 2015-05-08 08:44:46 -05:00
bagel [SPARK-6758]block the right jetty package in log 2015-04-09 17:44:08 -04:00
bin Limit help option regex 2015-05-01 19:26:55 +01:00
build SPARK-5856: In Maven build script, launch Zinc with more memory 2015-02-17 10:10:01 -08:00
conf [SPARK-7811] Fix typo on slf4j configuration on metrics.properties.tem… 2015-05-24 21:48:27 +01:00
core [SPARK-7873] Allow KryoSerializerInstance to create multiple streams at the same time 2015-05-27 20:19:53 -07:00
data/mllib [SPARK-7574] [ML] [DOC] User guide for OneVsRest 2015-05-22 13:18:08 -07:00
dev [SPARK-7832] [Build] Always run SQL tests in master build. 2015-05-25 18:23:58 -07:00
docker [SPARK-2691] [MESOS] Support for Mesos DockerInfo 2015-05-01 18:41:22 -07:00
docs [SPARK-7850][BUILD] Hive 0.12.0 profile in POM should be removed 2015-05-27 00:18:42 -07:00
ec2 [SPARK-3674] YARN support in Spark EC2 2015-05-26 15:01:27 -07:00
examples Close HBaseAdmin at the end of HBaseTest 2015-05-25 08:19:42 +01:00
external [SPARK-7621] [STREAMING] Report Kafka errors to StreamingListeners 2015-05-18 18:13:29 -07:00
extras [SPARK-7838] [STREAMING] Set scope for kinesis stream 2015-05-22 23:05:54 -07:00
graphx [SPARK-5854] personalized page rank 2015-05-01 11:55:43 -07:00
launcher [MINOR] Avoid passing the PermGenSize option to IBM JVMs. 2015-05-13 21:00:12 +01:00
mllib [SPARK-7535] [.1] [MLLIB] minor changes to the pipeline API 2015-05-26 23:51:32 -07:00
network [SPARK-7726] Fix Scaladoc false errors 2015-05-19 12:14:48 -07:00
project [SPARK-7805] [SQL] Move SQLTestUtils.scala and ParquetTest.scala to src/test 2015-05-24 09:51:37 -07:00
python [SPARK-7339] [PYSPARK] PySpark shuffle spill memory sometimes are not correct 2015-05-26 08:35:39 -07:00
R [SPARK-6811] Copy SparkR lib in make-distribution.sh 2015-05-23 00:04:01 -07:00
repl [SPARK-7726] Fix Scaladoc false errors 2015-05-19 12:14:48 -07:00
sbin [SPARK-5412] [DEPLOY] Cannot bind Master to a specific hostname as per the documentation 2015-05-15 11:30:19 -07:00
sbt Adde LICENSE Header to build/mvn, build/sbt and sbt/sbt 2014-12-29 10:48:53 -08:00
sql [SPARK-7907] [SQL] [UI] Rename tab ThriftServer to SQL. 2015-05-27 20:04:29 -07:00
streaming [SPARK-7777][Streaming] Handle the case when there is no block in a batch 2015-05-23 02:11:17 -07:00
tools [SPARK-4550] In sort-based shuffle, store map outputs in serialized form 2015-04-30 23:14:14 -07:00
unsafe [SPARK-7800] isDefined should not marked too early in putNewKey 2015-05-21 23:12:00 +01:00
yarn [SPARK-6602] [CORE] Remove some places in core that calling SparkEnv.actorSystem 2015-05-26 15:28:49 -07:00
.gitattributes [SPARK-3870] EOL character enforcement 2014-10-31 12:39:52 -07:00
.gitignore [MINOR] Ignore python/lib/pyspark.zip 2015-05-08 14:06:02 -07:00
.rat-excludes [WEBUI] Remove debug feature for vis.js 2015-05-08 14:06:37 -07:00
CONTRIBUTING.md [SPARK-6889] [DOCS] CONTRIBUTING.md updates to accompany contribution doc updates 2015-04-21 22:34:31 -07:00
LICENSE [BUILD] update jblas dependency version to 1.2.4 2015-05-16 18:17:48 +01:00
make-distribution.sh [HOTFIX] Copy SparkR lib if it exists in make-distribution 2015-05-23 12:28:16 -07:00
NOTICE SPARK-1827. LICENSE and NOTICE files need a refresh to contain transitive dependency info 2014-05-14 09:38:33 -07:00
pom.xml [SPARK-7850][BUILD] Hive 0.12.0 profile in POM should be removed 2015-05-27 00:18:42 -07:00
README.md [MINOR] [DOCS] Fix the link to test building info on the wiki 2015-05-12 00:25:43 +01:00
scalastyle-config.xml [SPARK-6428] Turn on explicit type checking for public methods. 2015-04-03 01:25:02 -07:00
tox.ini [SPARK-7427] [PYSPARK] Make sharedParams match in Scala, Python 2015-05-10 19:18:32 -07:00

Apache Spark

Spark is a fast and general cluster computing system for Big Data. It provides high-level APIs in Scala, Java, and Python, and an optimized engine that supports general computation graphs for data analysis. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming for stream processing.

http://spark.apache.org/

Online Documentation

You can find the latest Spark documentation, including a programming guide, on the project web page and project wiki. This README file only contains basic setup instructions.

Building Spark

Spark is built using Apache Maven. To build Spark and its example programs, run:

mvn -DskipTests clean package

(You do not need to do this if you downloaded a pre-built package.) More detailed documentation is available from the project site, at "Building Spark".

Interactive Scala Shell

The easiest way to start using Spark is through the Scala shell:

./bin/spark-shell

Try the following command, which should return 1000:

scala> sc.parallelize(1 to 1000).count()

Interactive Python Shell

Alternatively, if you prefer Python, you can use the Python shell:

./bin/pyspark

And run the following command, which should also return 1000:

>>> sc.parallelize(range(1000)).count()

Example Programs

Spark also comes with several sample programs in the examples directory. To run one of them, use ./bin/run-example <class> [params]. For example:

./bin/run-example SparkPi

will run the Pi example locally.

You can set the MASTER environment variable when running examples to submit examples to a cluster. This can be a mesos:// or spark:// URL, "yarn-cluster" or "yarn-client" to run on YARN, and "local" to run locally with one thread, or "local[N]" to run locally with N threads. You can also use an abbreviated class name if the class is in the examples package. For instance:

MASTER=spark://host:7077 ./bin/run-example SparkPi

Many of the example programs print usage help if no params are given.

Running Tests

Testing first requires building Spark. Once Spark is built, tests can be run using:

./dev/run-tests

Please see the guidance on how to run tests for a module, or individual tests.

A Note About Hadoop Versions

Spark uses the Hadoop core library to talk to HDFS and other Hadoop-supported storage systems. Because the protocols have changed in different versions of Hadoop, you must build Spark against the same version that your cluster runs.

Please refer to the build documentation at "Specifying the Hadoop Version" for detailed guidance on building for a particular distribution of Hadoop, including building for particular Hive and Hive Thriftserver distributions. See also "Third Party Hadoop Distributions" for guidance on building a Spark application that works with a particular distribution.

Configuration

Please refer to the Configuration guide in the online documentation for an overview on how to configure Spark.