Apache Spark - A unified analytics engine for large-scale data processing
Go to file
Yin Huai f2c47082c3 [SPARK-1442] [SQL] Window Function Support for Spark SQL
Adding more information about the implementation...

This PR is adding the support of window functions to Spark SQL (specifically OVER and WINDOW clause). For every expression having a OVER clause, we use a WindowExpression as the container of a WindowFunction and the corresponding WindowSpecDefinition (the definition of a window frame, i.e. partition specification, order specification, and frame specification appearing in a OVER clause).
# Implementation #
The high level work flow of the implementation is described as follows.

*	Query parsing: In the query parse process, all WindowExpressions are originally placed in the projectList of a Project operator or the aggregateExpressions of an Aggregate operator. It makes our changes to simple and keep all of parsing rules for window functions at a single place (nodesToWindowSpecification). For the WINDOWclause in a query, we use a WithWindowDefinition as the container as the mapping from the name of a window specification to a WindowSpecDefinition. This changes is similar with our common table expression support.

*	Analysis: The query analysis process has three steps for window functions.

 *	Resolve all WindowSpecReferences by replacing them with WindowSpecReferences according to the mapping table stored in the node of WithWindowDefinition.
 *	Resolve WindowFunctions in the projectList of a Project operator or the aggregateExpressions of an Aggregate operator. For this PR, we use Hive's functions for window functions because we will have a major refactoring of our internal UDAFs and it is better to switch our UDAFs after that refactoring work.
 *	Once we have resolved all WindowFunctions, we will use ResolveWindowFunction to extract WindowExpressions from projectList and aggregateExpressions and then create a Window operator for every distinct WindowSpecDefinition. With this choice, at the execution time, we can rely on the Exchange operator to do all of work on reorganizing the table and we do not need to worry about it in the physical Window operator. An example analyzed plan is shown as follows

```
sql("""
SELECT
  year, country, product, sales,
  avg(sales) over(partition by product) avg_product,
  sum(sales) over(partition by country) sum_country
FROM sales
ORDER BY year, country, product
""").explain(true)

== Analyzed Logical Plan ==
Sort [year#34 ASC,country#35 ASC,product#36 ASC], true
 Project [year#34,country#35,product#36,sales#37,avg_product#27,sum_country#28]
  Window [year#34,country#35,product#36,sales#37,avg_product#27], [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum(sales#37) WindowSpecDefinition [country#35], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS sum_country#28], WindowSpecDefinition [country#35], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
   Window [year#34,country#35,product#36,sales#37], [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFAverage(sales#37) WindowSpecDefinition [product#36], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS avg_product#27], WindowSpecDefinition [product#36], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
    Project [year#34,country#35,product#36,sales#37]
     MetastoreRelation default, sales, None
```

*	Query planning: In the process of query planning, we simple generate the physical Window operator based on the logical Window operator. Then, to prepare the executedPlan, the EnsureRequirements rule will add Exchange and Sort operators if necessary. The EnsureRequirements rule will analyze the data properties and try to not add unnecessary shuffle and sort. The physical plan for the above example query is shown below.

```
== Physical Plan ==
Sort [year#34 ASC,country#35 ASC,product#36 ASC], true
 Exchange (RangePartitioning [year#34 ASC,country#35 ASC,product#36 ASC], 200), []
  Window [year#34,country#35,product#36,sales#37,avg_product#27], [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum(sales#37) WindowSpecDefinition [country#35], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS sum_country#28], WindowSpecDefinition [country#35], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
   Exchange (HashPartitioning [country#35], 200), [country#35 ASC]
    Window [year#34,country#35,product#36,sales#37], [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFAverage(sales#37) WindowSpecDefinition [product#36], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS avg_product#27], WindowSpecDefinition [product#36], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
     Exchange (HashPartitioning [product#36], 200), [product#36 ASC]
      HiveTableScan [year#34,country#35,product#36,sales#37], (MetastoreRelation default, sales, None), None
```

*	Execution time: At execution time, a physical Window operator buffers all rows in a partition specified in the partition spec of a OVER clause. If necessary, it also maintains a sliding window frame. The current implementation tries to buffer the input parameters of a window function according to the window frame to avoid evaluating a row multiple times.

# Future work #

Here are three improvements that are not hard to add:
*	Taking advantage of the window frame specification to reduce the number of rows buffered in the physical Window operator. For some cases, we only need to buffer the rows appearing in the sliding window. But for other cases, we will not be able to reduce the number of rows buffered (e.g. ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING).

*	When aRAGEN frame is used, for <value> PRECEDING and <value> FOLLOWING, it will be great if the <value> part is an expression (we can start with Literal). So, when the data type of ORDER BY expression is a FractionalType, we can support FractionalType as the type <value> (<value> still needs to be evaluated as a positive value).

*	When aRAGEN frame is used, we need to support DateType and TimestampType as the data type of the expression appearing in the order specification. Then, the <value> part of <value> PRECEDING and <value> FOLLOWING can support interval types (once we support them).

This is a joint work with guowei2 and yhuai
Thanks hbutani hvanhovell for his comments
Thanks scwf for his comments and unit tests

Author: Yin Huai <yhuai@databricks.com>

Closes #5604 from guowei2/windowImplement and squashes the following commits:

76fe1c8 [Yin Huai] Implementation.
aa2b0ae [Yin Huai] Tests.
2015-05-06 10:43:00 -07:00
assembly [SPARK-7168] [BUILD] Update plugin versions in Maven build and centralize versions 2015-04-28 07:48:34 -04:00
bagel [SPARK-6758]block the right jetty package in log 2015-04-09 17:44:08 -04:00
bin Limit help option regex 2015-05-01 19:26:55 +01:00
build SPARK-5856: In Maven build script, launch Zinc with more memory 2015-02-17 10:10:01 -08:00
conf [SPARK-2691] [MESOS] Support for Mesos DockerInfo 2015-05-01 18:41:22 -07:00
core [SPARK-7384][Core][Tests] Fix flaky tests for distributed mode in BroadcastSuite 2015-05-05 23:25:28 -07:00
data/mllib [SPARK-5939][MLLib] make FPGrowth example app take parameters 2015-02-23 08:47:28 -08:00
dev [MINOR] Fix python test typo? 2015-05-04 17:17:55 +01:00
docker [SPARK-2691] [MESOS] Support for Mesos DockerInfo 2015-05-01 18:41:22 -07:00
docs Revert "[SPARK-3454] separate json endpoints for data in the UI" 2015-05-05 19:27:30 -07:00
ec2 [SPARK-4897] [PySpark] Python 3 support 2015-04-16 16:20:57 -07:00
examples [SPARK-6612] [MLLIB] [PYSPARK] Python KMeans parity 2015-05-05 07:57:39 -07:00
external [SPARK-7113] [STREAMING] Support input information reporting for Direct Kafka stream 2015-05-05 02:01:06 -07:00
extras [SPARK-6440][CORE]Handle IPv6 addresses properly when constructing URI 2015-04-13 12:55:25 +01:00
graphx [SPARK-5854] personalized page rank 2015-05-01 11:55:43 -07:00
launcher [SPARK-7031] [THRIFTSERVER] let thrift server take SPARK_DAEMON_MEMORY and SPARK_DAEMON_JAVA_OPTS 2015-05-03 00:47:47 +01:00
mllib [SPARK-6940] [MLLIB] Add CrossValidator to Python ML pipeline API 2015-05-06 01:28:43 -07:00
network [SPARK-6229] Add SASL encryption to network library. 2015-05-01 19:01:46 -07:00
project [Build] Enable MiMa checks for SQL 2015-04-30 16:23:01 -07:00
python [SPARK-6940] [MLLIB] Add CrossValidator to Python ML pipeline API 2015-05-06 01:28:43 -07:00
R [SPARK-6841] [SPARKR] add support for mean, median, stdev etc. 2015-05-05 20:39:56 -07:00
repl [SPARK-7092] Update spark scala version to 2.11.6 2015-04-25 18:07:34 -04:00
sbin [SPARK-5338] [MESOS] Add cluster mode support for Mesos 2015-04-28 13:33:57 -07:00
sbt Adde LICENSE Header to build/mvn, build/sbt and sbt/sbt 2014-12-29 10:48:53 -08:00
sql [SPARK-1442] [SQL] Window Function Support for Spark SQL 2015-05-06 10:43:00 -07:00
streaming [SPARK-6939] [STREAMING] [WEBUI] Add timeline and histogram graphs for streaming statistics 2015-05-05 12:52:16 -07:00
tools [SPARK-4550] In sort-based shuffle, store map outputs in serialized form 2015-04-30 23:14:14 -07:00
unsafe [MINOR] Minor update for document 2015-05-05 14:44:02 +01:00
yarn [SPARK-6653] [YARN] New config to specify port for sparkYarnAM actor system 2015-05-05 11:09:51 +01:00
.gitattributes [SPARK-3870] EOL character enforcement 2014-10-31 12:39:52 -07:00
.gitignore [SPARK-5654] Integrate SparkR 2015-04-08 22:45:40 -07:00
.rat-excludes Revert "[SPARK-3454] separate json endpoints for data in the UI" 2015-05-05 19:27:30 -07:00
CONTRIBUTING.md [SPARK-6889] [DOCS] CONTRIBUTING.md updates to accompany contribution doc updates 2015-04-21 22:34:31 -07:00
LICENSE [SPARK-6939] [STREAMING] [WEBUI] Add timeline and histogram graphs for streaming statistics 2015-05-05 12:52:16 -07:00
make-distribution.sh [SPARK-7302] [DOCS] SPARK building documentation still mentions building for yarn 0.23 2015-05-03 21:22:31 +01:00
NOTICE SPARK-1827. LICENSE and NOTICE files need a refresh to contain transitive dependency info 2014-05-14 09:38:33 -07:00
pom.xml Revert "[SPARK-3454] separate json endpoints for data in the UI" 2015-05-05 19:27:30 -07:00
README.md [docs] [SPARK-6306] Readme points to dead link 2015-03-12 15:01:33 +00:00
scalastyle-config.xml [SPARK-6428] Turn on explicit type checking for public methods. 2015-04-03 01:25:02 -07:00
tox.ini [SPARK-3073] [PySpark] use external sort in sortBy() and sortByKey() 2014-08-26 16:57:40 -07:00

Apache Spark

Spark is a fast and general cluster computing system for Big Data. It provides high-level APIs in Scala, Java, and Python, and an optimized engine that supports general computation graphs for data analysis. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming for stream processing.

http://spark.apache.org/

Online Documentation

You can find the latest Spark documentation, including a programming guide, on the project web page and project wiki. This README file only contains basic setup instructions.

Building Spark

Spark is built using Apache Maven. To build Spark and its example programs, run:

mvn -DskipTests clean package

(You do not need to do this if you downloaded a pre-built package.) More detailed documentation is available from the project site, at "Building Spark".

Interactive Scala Shell

The easiest way to start using Spark is through the Scala shell:

./bin/spark-shell

Try the following command, which should return 1000:

scala> sc.parallelize(1 to 1000).count()

Interactive Python Shell

Alternatively, if you prefer Python, you can use the Python shell:

./bin/pyspark

And run the following command, which should also return 1000:

>>> sc.parallelize(range(1000)).count()

Example Programs

Spark also comes with several sample programs in the examples directory. To run one of them, use ./bin/run-example <class> [params]. For example:

./bin/run-example SparkPi

will run the Pi example locally.

You can set the MASTER environment variable when running examples to submit examples to a cluster. This can be a mesos:// or spark:// URL, "yarn-cluster" or "yarn-client" to run on YARN, and "local" to run locally with one thread, or "local[N]" to run locally with N threads. You can also use an abbreviated class name if the class is in the examples package. For instance:

MASTER=spark://host:7077 ./bin/run-example SparkPi

Many of the example programs print usage help if no params are given.

Running Tests

Testing first requires building Spark. Once Spark is built, tests can be run using:

./dev/run-tests

Please see the guidance on how to run all automated tests.

A Note About Hadoop Versions

Spark uses the Hadoop core library to talk to HDFS and other Hadoop-supported storage systems. Because the protocols have changed in different versions of Hadoop, you must build Spark against the same version that your cluster runs.

Please refer to the build documentation at "Specifying the Hadoop Version" for detailed guidance on building for a particular distribution of Hadoop, including building for particular Hive and Hive Thriftserver distributions. See also "Third Party Hadoop Distributions" for guidance on building a Spark application that works with a particular distribution.

Configuration

Please refer to the Configuration guide in the online documentation for an overview on how to configure Spark.