Apache Spark - A unified analytics engine for large-scale data processing
Go to file
Joseph K. Bradley 0dc2b6361d [SPARK-1545] [mllib] Add Random Forests
This PR adds RandomForest to MLlib.  The implementation is basic, and future performance optimizations will be important.  (Note: RFs = Random Forests.)

# Overview

## RandomForest
* trains multiple trees at once to reduce the number of passes over the data
* allows feature subsets at each node
* uses a queue of nodes instead of fixed groups for each level

This implementation is based an implementation by manishamde and the [Alpine Labs Sequoia Forest](https://github.com/AlpineNow/SparkML2) by codedeft (in particular, the TreePoint, BaggedPoint, and node queue implementations).  Thank you for your inputs!

## Testing

Correctness: This has been tested for correctness with the test suites and with DecisionTreeRunner on example datasets.

Performance: This has been performance tested using [this branch of spark-perf](https://github.com/jkbradley/spark-perf/tree/rfs).  Results below.

### Regression tests for DecisionTree

Summary: For training 1 tree, there are small regressions, especially from feature subsampling.

In the table below, each row is a single (random) dataset.  The 2 different sets of result columns are for 2 different RF implementations:
* (numTrees): This is from an earlier commit, after implementing RandomForest to train multiple trees at once.  It does not include any code for feature subsampling.
* (feature subsets): This is from this current PR's code, after implementing feature subsampling.
These tests were to identify regressions in DecisionTree, so they are training 1 tree with all of the features (i.e., no feature subsampling).

These were run on an EC2 cluster with 15 workers, training 1 tree with maxDepth = 5 (= 6 levels).  Speedup values < 1 indicate slowdowns from the old DecisionTree implementation.

numInstances | numFeatures | runtime (sec) | speedup | runtime (sec) | speedup
---- | ---- | ---- | ---- | ---- | ----
 | | (numTrees) | (numTrees) | (feature subsets) | (feature subsets)
20000 | 100 | 4.051 | 1.044433473 | 4.478 | 0.9448414471
20000 | 500 | 8.472 | 1.104461756 | 9.315 | 1.004508857
20000 | 1500 | 19.354 | 1.05854087 | 20.863 | 0.9819776638
20000 | 3500 | 43.674 | 1.072033704 | 45.887 | 1.020332556
200000 | 100 | 4.196 | 1.171830315 | 4.848 | 1.014232673
200000 | 500 | 8.926 | 1.082791844 | 9.771 | 0.989151571
200000 | 1500 | 20.58 | 1.068415938 | 22.134 | 0.9934038131
200000 | 3500 | 48.043 | 1.075203464 | 52.249 | 0.9886505005
2000000 | 100 | 4.944 | 1.01355178 | 5.796 | 0.8645617667
2000000 | 500 | 11.11 | 1.016831683 | 12.482 | 0.9050632911
2000000 | 1500 | 31.144 | 1.017852556 | 35.274 | 0.8986789136
2000000 | 3500 | 79.981 | 1.085382778 | 101.105 | 0.8586123337
20000000 | 100 | 8.304 | 0.9270231214 | 9.073 | 0.8484514494
20000000 | 500 | 28.174 | 1.083268262 | 34.236 | 0.8914592826
20000000 | 1500 | 143.97 | 0.9579634646 | 159.275 | 0.8659111599

### Tests for forests

I have run other tests with numTrees=10 and with sqrt(numFeatures), and those indicate that multi-model training and feature subsets can speed up training for forests, especially when training deeper trees.

# Details on specific classes

## Changes to DecisionTree
* Main train() method is now in RandomForest.
* findBestSplits() is no longer needed.  (It split levels into groups, but we now use a queue of nodes.)
* Many small changes to support RFs.  (Note: These methods should be moved to RandomForest.scala in a later PR, but are in DecisionTree.scala to make code comparison easier.)

## RandomForest
* Main train() method is from old DecisionTree.
* selectNodesToSplit: Note that it selects nodes and feature subsets jointly to track memory usage.

## RandomForestModel
* Stores an Array[DecisionTreeModel]
* Prediction:
 * For classification, most common label.  For regression, mean.
 * We could support other methods later.

## examples/.../DecisionTreeRunner
* This now takes numTrees and featureSubsetStrategy, to support RFs.

## DTStatsAggregator
* 2 types of functionality (w/ and w/o subsampling features): These require different indexing methods.  (We could treat both as subsampling, but this is less efficient
  DTStatsAggregator is now abstract, and 2 child classes implement these 2 types of functionality.

## impurities
* These now take instance weights.

## Node
* Some vals changed to vars.
 * This is unfortunately a public API change (DeveloperApi).  This could be avoided by creating a LearningNode struct, but would be awkward.

## RandomForestSuite
Please let me know if there are missing tests!

## BaggedPoint
This wraps TreePoint and holds bootstrap weights/counts.

# Design decisions

* BaggedPoint: BaggedPoint is separate from TreePoint since it may be useful for other bagging algorithms later on.

* RandomForest public API: What options should be easily supported by the train* methods?  Should ALL options be in the Java-friendly constructors?  Should there be a constructor taking Strategy?

* Feature subsampling options: What options should be supported?  scikit-learn supports the same options, except for "onethird."  One option would be to allow users to specific fractions ("0.1"): the current options could be supported, and any unrecognized values would be parsed as Doubles in [0,1].

* Splits and bins are computed before bootstrapping, so all trees use the same discretization.

* One queue, instead of one queue per tree.

CC: mengxr manishamde codedeft chouqin  Please let me know if you have suggestions---thanks!

Author: Joseph K. Bradley <joseph.kurata.bradley@gmail.com>
Author: qiping.lqp <qiping.lqp@alibaba-inc.com>
Author: chouqin <liqiping1991@gmail.com>

Closes #2435 from jkbradley/rfs-new and squashes the following commits:

c694174 [Joseph K. Bradley] Fixed typo
cc59d78 [Joseph K. Bradley] fixed imports
e25909f [Joseph K. Bradley] Simplified node group maps.  Specifically, created NodeIndexInfo to store node index in agg and feature subsets, and no longer create extra maps in findBestSplits
fbe9a1e [Joseph K. Bradley] Changed default featureSubsetStrategy to be sqrt for classification, onethird for regression.  Updated docs with references.
ef7c293 [Joseph K. Bradley] Updates based on code review.  Most substantial changes: * Simplified DTStatsAggregator * Made RandomForestModel.trees public * Added test for regression to RandomForestSuite
593b13c [Joseph K. Bradley] Fixed bug in metadata for computing log2(num features).  Now it checks >= 1.
a1a08df [Joseph K. Bradley] Removed old comments
866e766 [Joseph K. Bradley] Changed RandomForestSuite randomized tests to use multiple fixed random seeds.
ff8bb96 [Joseph K. Bradley] removed usage of null from RandomForest and replaced with Option
bf1a4c5 [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into rfs-new
6b79c07 [Joseph K. Bradley] Added RandomForestSuite, and fixed small bugs, style issues.
d7753d4 [Joseph K. Bradley] Added numTrees and featureSubsetStrategy to DecisionTreeRunner (to support RandomForest).  Fixed bugs so that RandomForest now runs.
746d43c [Joseph K. Bradley] Implemented feature subsampling.  Tested DecisionTree but not RandomForest.
6309d1d [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into rfs-new.  Added RandomForestModel.toString
b7ae594 [Joseph K. Bradley] Updated docs.  Small fix for bug which does not cause errors: No longer allocate unused child nodes for leaf nodes.
121c74e [Joseph K. Bradley] Basic random forests are implemented.  Random features per node not yet implemented.  Test suite not implemented.
325d18a [Joseph K. Bradley] Merge branch 'chouqin-dt-preprune' into rfs-new
4ef9bf1 [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into rfs-new
61b2e72 [Joseph K. Bradley] Added max of 10GB for maxMemoryInMB in Strategy.
a95e7c8 [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into chouqin-dt-preprune
6da8571 [Joseph K. Bradley] RFs partly implemented, not done yet
eddd1eb [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into rfs-new
5c4ac33 [Joseph K. Bradley] Added check in Strategy to make sure minInstancesPerNode >= 1
0dd4d87 [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into dt-spark-3160
95c479d [Joseph K. Bradley] * Fixed typo in tree suite test "do not choose split that does not satisfy min instance per node requirements" * small style fixes
e2628b6 [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into chouqin-dt-preprune
19b01af [Joseph K. Bradley] Merge remote-tracking branch 'chouqin/dt-preprune' into chouqin-dt-preprune
f1d11d1 [chouqin] fix typo
c7ebaf1 [chouqin] fix typo
39f9b60 [chouqin] change edge `minInstancesPerNode` to 2 and add one more test
c6e2dfc [Joseph K. Bradley] Added minInstancesPerNode and minInfoGain parameters to DecisionTreeRunner.scala and to Python API in tree.py
306120f [Joseph K. Bradley] Fixed typo in DecisionTreeModel.scala doc
eaa1dcf [Joseph K. Bradley] Added topNode doc in DecisionTree and scalastyle fix
d4d7864 [Joseph K. Bradley] Marked Node.build as deprecated
d4dbb99 [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into dt-spark-3160
1a8f0ad [Joseph K. Bradley] Eliminated pre-allocated nodes array in main train() method. * Nodes are constructed and added to the tree structure as needed during training.
0278a11 [chouqin] remove `noSplit` and set `Predict` private to tree
d593ec7 [chouqin] fix docs and change minInstancesPerNode to 1
2ab763b [Joseph K. Bradley] Simplifications to DecisionTree code:
efcc736 [qiping.lqp] fix bug
10b8012 [qiping.lqp] fix style
6728fad [qiping.lqp] minor fix: remove empty lines
bb465ca [qiping.lqp] Merge branch 'master' of https://github.com/apache/spark into dt-preprune
cadd569 [qiping.lqp] add api docs
46b891f [qiping.lqp] fix bug
e72c7e4 [qiping.lqp] add comments
845c6fa [qiping.lqp] fix style
f195e83 [qiping.lqp] fix style
987cbf4 [qiping.lqp] fix bug
ff34845 [qiping.lqp] separate calculation of predict of node from calculation of info gain
ac42378 [qiping.lqp] add min info gain and min instances per node parameters in decision tree
2014-09-28 21:44:50 -07:00
assembly [SPARK-3647] Add more exceptions to Guava relocation. 2014-09-23 13:42:00 -07:00
bagel SPARK-2482: Resolve sbt warnings during build 2014-09-11 18:44:35 -07:00
bin [SPARK-3547]Using a special exit code instead of 1 to represent ClassNotFoundExcepti... 2014-09-18 10:17:18 -07:00
conf [SPARK-3584] sbin/slaves doesn't work when we use password authentication for SSH 2014-09-25 16:49:15 -07:00
core [SPARK-3543] TaskContext remaining cleanup work. 2014-09-28 20:32:54 -07:00
data/mllib SPARK-2363. Clean MLlib's sample data files 2014-07-13 19:27:43 -07:00
dev [Build] Diff from branch point 2014-09-24 11:33:58 -07:00
docker [SPARK-1342] Scala 2.10.4 2014-04-01 18:35:50 -07:00
docs [SPARK-3715][Docs]minor typo 2014-09-28 18:30:13 -07:00
ec2 [SPARK-3659] Set EC2 version to 1.1.0 and update version map 2014-09-24 11:34:39 -07:00
examples [SPARK-1545] [mllib] Add Random Forests 2014-09-28 21:44:50 -07:00
external [SPARK-3686][STREAMING] Wait for sink to commit the channel before check... 2014-09-25 22:56:43 -07:00
extras SPARK-3639 | Removed settings master in examples 2014-09-26 09:48:46 -07:00
graphx [SPARK-3578] Fix upper bound in GraphGenerators.sampleLogNormal 2014-09-22 13:47:43 -07:00
mllib [SPARK-1545] [mllib] Add Random Forests 2014-09-28 21:44:50 -07:00
project SPARK-3699: SQL and Hive console tasks now clean up appropriately 2014-09-28 01:01:27 -07:00
python [SPARK-3681] [SQL] [PySpark] fix serialization of List and Map in SchemaRDD 2014-09-27 12:21:37 -07:00
repl [SPARK-3452] Maven build should skip publishing artifacts people shouldn... 2014-09-14 21:17:29 -07:00
sbin [SPARK-3584] sbin/slaves doesn't work when we use password authentication for SSH 2014-09-25 16:49:15 -07:00
sbt SPARK-3337 Paranoid quoting in shell to allow install dirs with spaces within. 2014-09-08 10:24:15 -07:00
sql [SPARK-3543] TaskContext remaining cleanup work. 2014-09-28 20:32:54 -07:00
streaming Minor cleanup to tighten visibility and remove compilation warning. 2014-09-27 00:57:26 -07:00
tools [SPARK-3433][BUILD] Fix for Mima false-positives with @DeveloperAPI and @Experimental annotations. 2014-09-15 21:14:00 -07:00
yarn [SPARK-3476] Remove outdated memory checks in Yarn 2014-09-26 11:50:48 -07:00
.gitignore [SPARK-3584] sbin/slaves doesn't work when we use password authentication for SSH 2014-09-25 16:49:15 -07:00
.rat-excludes [SPARK-3584] sbin/slaves doesn't work when we use password authentication for SSH 2014-09-25 16:49:15 -07:00
CONTRIBUTING.md [Docs] minor grammar fix 2014-09-17 12:33:09 -07:00
LICENSE [SPARK-3073] [PySpark] use external sort in sortBy() and sortByKey() 2014-08-26 16:57:40 -07:00
make-distribution.sh Slaves file is now a template. 2014-09-26 22:21:50 -07:00
NOTICE SPARK-1827. LICENSE and NOTICE files need a refresh to contain transitive dependency info 2014-05-14 09:38:33 -07:00
pom.xml [Build]remove spark-staging-1030 2014-09-26 22:23:49 -07:00
README.md [Docs] minor punctuation fix 2014-09-16 11:48:20 -07:00
scalastyle-config.xml [SPARK-2182] Scalastyle rule blocking non ascii characters. 2014-09-16 09:21:03 -07:00
tox.ini [SPARK-3073] [PySpark] use external sort in sortBy() and sortByKey() 2014-08-26 16:57:40 -07:00

Apache Spark

Spark is a fast and general cluster computing system for Big Data. It provides high-level APIs in Scala, Java, and Python, and an optimized engine that supports general computation graphs for data analysis. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming for stream processing.

http://spark.apache.org/

Online Documentation

You can find the latest Spark documentation, including a programming guide, on the project web page. This README file only contains basic setup instructions.

Building Spark

Spark is built using Apache Maven. To build Spark and its example programs, run:

mvn -DskipTests clean package

(You do not need to do this if you downloaded a pre-built package.) More detailed documentation is available from the project site, at "Building Spark".

Interactive Scala Shell

The easiest way to start using Spark is through the Scala shell:

./bin/spark-shell

Try the following command, which should return 1000:

scala> sc.parallelize(1 to 1000).count()

Interactive Python Shell

Alternatively, if you prefer Python, you can use the Python shell:

./bin/pyspark

And run the following command, which should also return 1000:

>>> sc.parallelize(range(1000)).count()

Example Programs

Spark also comes with several sample programs in the examples directory. To run one of them, use ./bin/run-example <class> [params]. For example:

./bin/run-example SparkPi

will run the Pi example locally.

You can set the MASTER environment variable when running examples to submit examples to a cluster. This can be a mesos:// or spark:// URL, "yarn-cluster" or "yarn-client" to run on YARN, and "local" to run locally with one thread, or "local[N]" to run locally with N threads. You can also use an abbreviated class name if the class is in the examples package. For instance:

MASTER=spark://host:7077 ./bin/run-example SparkPi

Many of the example programs print usage help if no params are given.

Running Tests

Testing first requires building Spark. Once Spark is built, tests can be run using:

./dev/run-tests

Please see the guidance on how to run all automated tests.

A Note About Hadoop Versions

Spark uses the Hadoop core library to talk to HDFS and other Hadoop-supported storage systems. Because the protocols have changed in different versions of Hadoop, you must build Spark against the same version that your cluster runs.

Please refer to the build documentation at "Specifying the Hadoop Version" for detailed guidance on building for a particular distribution of Hadoop, including building for particular Hive and Hive Thriftserver distributions. See also "Third Party Hadoop Distributions" for guidance on building a Spark application that works with a particular distribution.

Configuration

Please refer to the Configuration guide in the online documentation for an overview on how to configure Spark.