c99bcb7fea
An initial API that exposes SparkSQL functionality in PySpark. A PythonRDD composed of dictionaries, with string keys and primitive values (boolean, float, int, long, string) can be converted into a SchemaRDD that supports sql queries. ``` from pyspark.context import SQLContext sqlCtx = SQLContext(sc) rdd = sc.parallelize([{"field1" : 1, "field2" : "row1"}, {"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}]) srdd = sqlCtx.applySchema(rdd) sqlCtx.registerRDDAsTable(srdd, "table1") srdd2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2 from table1") srdd2.collect() ``` The last line yields ```[{"f1" : 1, "f2" : "row1"}, {"f1" : 2, "f2": "row2"}, {"f1" : 3, "f2": "row3"}]``` Author: Ahir Reddy <ahirreddy@gmail.com> Author: Michael Armbrust <michael@databricks.com> Closes #363 from ahirreddy/pysql and squashes the following commits: 0294497 [Ahir Reddy] Updated log4j properties to supress Hive Warns 307d6e0 [Ahir Reddy] Style fix 6f7b8f6 [Ahir Reddy] Temporary fix MIMA checker. Since we now assemble Spark jar with Hive, we don't want to check the interfaces of all of our hive dependencies 3ef074a [Ahir Reddy] Updated documentation because classes moved to sql.py 29245bf [Ahir Reddy] Cache underlying SchemaRDD instead of generating and caching PythonRDD f2312c7 [Ahir Reddy] Moved everything into sql.py a19afe4 [Ahir Reddy] Doc fixes 6d658ba [Ahir Reddy] Remove the metastore directory created by the HiveContext tests in SparkSQL 521ff6d [Ahir Reddy] Trying to get spark to build with hive ab95eba [Ahir Reddy] Set SPARK_HIVE=true on jenkins ded03e7 [Ahir Reddy] Added doc test for HiveContext 22de1d4 [Ahir Reddy] Fixed maven pyrolite dependency e4da06c [Ahir Reddy] Display message if hive is not built into spark 227a0be [Michael Armbrust] Update API links. Fix Hive example. 58e2aa9 [Michael Armbrust] Build Docs for pyspark SQL Api. Minor fixes. 4285340 [Michael Armbrust] Fix building of Hive API Docs. 38a92b0 [Michael Armbrust] Add note to future non-python developers about python docs. 337b201 [Ahir Reddy] Changed com.clearspring.analytics stream version from 2.4.0 to 2.5.1 to match SBT build, and added pyrolite to maven build 40491c9 [Ahir Reddy] PR Changes + Method Visibility 1836944 [Michael Armbrust] Fix comments. e00980f [Michael Armbrust] First draft of python sql programming guide. b0192d3 [Ahir Reddy] Added Long, Double and Boolean as usable types + unit test f98a422 [Ahir Reddy] HiveContexts 79621cf [Ahir Reddy] cleaning up cruft b406ba0 [Ahir Reddy] doctest formatting 20936a5 [Ahir Reddy] Added tests and documentation e4d21b4 [Ahir Reddy] Added pyrolite dependency 79f739d [Ahir Reddy] added more tests 7515ba0 [Ahir Reddy] added more tests :) d26ec5e [Ahir Reddy] added test e9f5b8d [Ahir Reddy] adding tests 906d180 [Ahir Reddy] added todo explaining cost of creating Row object in python 251f99d [Ahir Reddy] for now only allow dictionaries as input 09b9980 [Ahir Reddy] made jrdd explicitly lazy c608947 [Ahir Reddy] SchemaRDD now has all RDD operations 725c91e [Ahir Reddy] awesome row objects 55d1c76 [Ahir Reddy] return row objects 4fe1319 [Ahir Reddy] output dictionaries correctly be079de [Ahir Reddy] returning dictionaries works cd5f79f [Ahir Reddy] Switched to using Scala SQLContext e948bd9 [Ahir Reddy] yippie 4886052 [Ahir Reddy] even better c0fb1c6 [Ahir Reddy] more working 043ca85 [Ahir Reddy] working 5496f9f [Ahir Reddy] doesn't crash b8b904b [Ahir Reddy] Added schema rdd class 67ba875 [Ahir Reddy] java to python, and python to java bcc0f23 [Ahir Reddy] Java to python ab6025d [Ahir Reddy] compiling
417 lines
16 KiB
Markdown
417 lines
16 KiB
Markdown
---
|
|
layout: global
|
|
title: Spark SQL Programming Guide
|
|
---
|
|
**Spark SQL is currently an Alpha component. Therefore, the APIs may be changed in future releases.**
|
|
|
|
* This will become a table of contents (this text will be scraped).
|
|
{:toc}
|
|
|
|
# Overview
|
|
|
|
<div class="codetabs">
|
|
<div data-lang="scala" markdown="1">
|
|
|
|
Spark SQL allows relational queries expressed in SQL, HiveQL, or Scala to be executed using
|
|
Spark. At the core of this component is a new type of RDD,
|
|
[SchemaRDD](api/sql/core/index.html#org.apache.spark.sql.SchemaRDD). SchemaRDDs are composed
|
|
[Row](api/sql/catalyst/index.html#org.apache.spark.sql.catalyst.expressions.Row) objects along with
|
|
a schema that describes the data types of each column in the row. A SchemaRDD is similar to a table
|
|
in a traditional relational database. A SchemaRDD can be created from an existing RDD, parquet
|
|
file, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/).
|
|
|
|
**All of the examples on this page use sample data included in the Spark distribution and can be run in the `spark-shell`.**
|
|
|
|
</div>
|
|
|
|
<div data-lang="java" markdown="1">
|
|
Spark SQL allows relational queries expressed in SQL, HiveQL, or Scala to be executed using
|
|
Spark. At the core of this component is a new type of RDD,
|
|
[JavaSchemaRDD](api/sql/core/index.html#org.apache.spark.sql.api.java.JavaSchemaRDD). JavaSchemaRDDs are composed
|
|
[Row](api/sql/catalyst/index.html#org.apache.spark.sql.api.java.Row) objects along with
|
|
a schema that describes the data types of each column in the row. A JavaSchemaRDD is similar to a table
|
|
in a traditional relational database. A JavaSchemaRDD can be created from an existing RDD, parquet
|
|
file, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/).
|
|
</div>
|
|
|
|
<div data-lang="python" markdown="1">
|
|
|
|
Spark SQL allows relational queries expressed in SQL or HiveQL to be executed using
|
|
Spark. At the core of this component is a new type of RDD,
|
|
[SchemaRDD](api/pyspark/pyspark.sql.SchemaRDD-class.html). SchemaRDDs are composed
|
|
[Row](api/pyspark/pyspark.sql.Row-class.html) objects along with
|
|
a schema that describes the data types of each column in the row. A SchemaRDD is similar to a table
|
|
in a traditional relational database. A SchemaRDD can be created from an existing RDD, parquet
|
|
file, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/).
|
|
|
|
**All of the examples on this page use sample data included in the Spark distribution and can be run in the `pyspark` shell.**
|
|
</div>
|
|
</div>
|
|
|
|
***************************************************************************************************
|
|
|
|
# Getting Started
|
|
|
|
<div class="codetabs">
|
|
<div data-lang="scala" markdown="1">
|
|
|
|
The entry point into all relational functionality in Spark is the
|
|
[SQLContext](api/sql/core/index.html#org.apache.spark.sql.SQLContext) class, or one of its
|
|
descendants. To create a basic SQLContext, all you need is a SparkContext.
|
|
|
|
{% highlight scala %}
|
|
val sc: SparkContext // An existing SparkContext.
|
|
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
|
|
|
|
// Importing the SQL context gives access to all the public SQL functions and implicit conversions.
|
|
import sqlContext._
|
|
{% endhighlight %}
|
|
|
|
</div>
|
|
|
|
<div data-lang="java" markdown="1">
|
|
|
|
The entry point into all relational functionality in Spark is the
|
|
[JavaSQLContext](api/sql/core/index.html#org.apache.spark.sql.api.java.JavaSQLContext) class, or one
|
|
of its descendants. To create a basic JavaSQLContext, all you need is a JavaSparkContext.
|
|
|
|
{% highlight java %}
|
|
JavaSparkContext ctx = ...; // An existing JavaSparkContext.
|
|
JavaSQLContext sqlCtx = new org.apache.spark.sql.api.java.JavaSQLContext(ctx);
|
|
{% endhighlight %}
|
|
|
|
</div>
|
|
|
|
<div data-lang="python" markdown="1">
|
|
|
|
The entry point into all relational functionality in Spark is the
|
|
[SQLContext](api/pyspark/pyspark.sql.SQLContext-class.html) class, or one
|
|
of its decedents. To create a basic SQLContext, all you need is a SparkContext.
|
|
|
|
{% highlight python %}
|
|
from pyspark.sql import SQLContext
|
|
sqlCtx = SQLContext(sc)
|
|
{% endhighlight %}
|
|
|
|
</div>
|
|
|
|
</div>
|
|
|
|
## Running SQL on RDDs
|
|
|
|
<div class="codetabs">
|
|
|
|
<div data-lang="scala" markdown="1">
|
|
|
|
One type of table that is supported by Spark SQL is an RDD of Scala case classes. The case class
|
|
defines the schema of the table. The names of the arguments to the case class are read using
|
|
reflection and become the names of the columns. Case classes can also be nested or contain complex
|
|
types such as Sequences or Arrays. This RDD can be implicitly converted to a SchemaRDD and then be
|
|
registered as a table. Tables can be used in subsequent SQL statements.
|
|
|
|
{% highlight scala %}
|
|
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
|
|
import sqlContext._
|
|
|
|
// Define the schema using a case class.
|
|
case class Person(name: String, age: Int)
|
|
|
|
// Create an RDD of Person objects and register it as a table.
|
|
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))
|
|
people.registerAsTable("people")
|
|
|
|
// SQL statements can be run by using the sql methods provided by sqlContext.
|
|
val teenagers = sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
|
|
|
|
// The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
|
|
// The columns of a row in the result can be accessed by ordinal.
|
|
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
|
|
{% endhighlight %}
|
|
|
|
</div>
|
|
|
|
<div data-lang="java" markdown="1">
|
|
|
|
One type of table that is supported by Spark SQL is an RDD of [JavaBeans](http://stackoverflow.com/questions/3295496/what-is-a-javabean-exactly). The BeanInfo
|
|
defines the schema of the table. Currently, Spark SQL does not support JavaBeans that contain
|
|
nested or contain complex types such as Lists or Arrays. You can create a JavaBean by creating a
|
|
class that implements Serializable and has getters and setters for all of its fields.
|
|
|
|
{% highlight java %}
|
|
|
|
public static class Person implements Serializable {
|
|
private String name;
|
|
private int age;
|
|
|
|
String getName() {
|
|
return name;
|
|
}
|
|
|
|
void setName(String name) {
|
|
this.name = name;
|
|
}
|
|
|
|
int getAge() {
|
|
return age;
|
|
}
|
|
|
|
void setAge(int age) {
|
|
this.age = age;
|
|
}
|
|
}
|
|
|
|
{% endhighlight %}
|
|
|
|
|
|
A schema can be applied to an existing RDD by calling `applySchema` and providing the Class object
|
|
for the JavaBean.
|
|
|
|
{% highlight java %}
|
|
JavaSQLContext ctx = new org.apache.spark.sql.api.java.JavaSQLContext(sc)
|
|
|
|
// Load a text file and convert each line to a JavaBean.
|
|
JavaRDD<Person> people = ctx.textFile("examples/src/main/resources/people.txt").map(
|
|
new Function<String, Person>() {
|
|
public Person call(String line) throws Exception {
|
|
String[] parts = line.split(",");
|
|
|
|
Person person = new Person();
|
|
person.setName(parts[0]);
|
|
person.setAge(Integer.parseInt(parts[1].trim()));
|
|
|
|
return person;
|
|
}
|
|
});
|
|
|
|
// Apply a schema to an RDD of JavaBeans and register it as a table.
|
|
JavaSchemaRDD schemaPeople = sqlCtx.applySchema(people, Person.class);
|
|
schemaPeople.registerAsTable("people");
|
|
|
|
// SQL can be run over RDDs that have been registered as tables.
|
|
JavaSchemaRDD teenagers = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
|
|
|
|
// The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
|
|
// The columns of a row in the result can be accessed by ordinal.
|
|
List<String> teenagerNames = teenagers.map(new Function<Row, String>() {
|
|
public String call(Row row) {
|
|
return "Name: " + row.getString(0);
|
|
}
|
|
}).collect();
|
|
|
|
{% endhighlight %}
|
|
|
|
</div>
|
|
|
|
<div data-lang="python" markdown="1">
|
|
|
|
One type of table that is supported by Spark SQL is an RDD of dictionaries. The keys of the
|
|
dictionary define the columns names of the table, and the types are inferred by looking at the first
|
|
row. Any RDD of dictionaries can converted to a SchemaRDD and then registered as a table. Tables
|
|
can be used in subsequent SQL statements.
|
|
|
|
{% highlight python %}
|
|
# Load a text file and convert each line to a dictionary.
|
|
lines = sc.textFile("examples/src/main/resources/people.txt")
|
|
parts = lines.map(lambda l: l.split(","))
|
|
people = parts.map(lambda p: {"name": p[0], "age": int(p[1])})
|
|
|
|
# Infer the schema, and register the SchemaRDD as a table.
|
|
# In future versions of PySpark we would like to add support for registering RDDs with other
|
|
# datatypes as tables
|
|
peopleTable = sqlCtx.inferSchema(people)
|
|
peopleTable.registerAsTable("people")
|
|
|
|
# SQL can be run over SchemaRDDs that have been registered as a table.
|
|
teenagers = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
|
|
|
|
# The results of SQL queries are RDDs and support all the normal RDD operations.
|
|
teenNames = teenagers.map(lambda p: "Name: " + p.name)
|
|
{% endhighlight %}
|
|
|
|
</div>
|
|
|
|
</div>
|
|
|
|
**Note that Spark SQL currently uses a very basic SQL parser.**
|
|
Users that want a more complete dialect of SQL should look at the HiveQL support provided by
|
|
`HiveContext`.
|
|
|
|
## Using Parquet
|
|
|
|
Parquet is a columnar format that is supported by many other data processing systems. Spark SQL
|
|
provides support for both reading and writing parquet files that automatically preserves the schema
|
|
of the original data. Using the data from the above example:
|
|
|
|
<div class="codetabs">
|
|
|
|
<div data-lang="scala" markdown="1">
|
|
|
|
{% highlight scala %}
|
|
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
|
|
import sqlContext._
|
|
|
|
val people: RDD[Person] = ... // An RDD of case class objects, from the previous example.
|
|
|
|
// The RDD is implicitly converted to a SchemaRDD, allowing it to be stored using parquet.
|
|
people.saveAsParquetFile("people.parquet")
|
|
|
|
// Read in the parquet file created above. Parquet files are self-describing so the schema is preserved.
|
|
// The result of loading a parquet file is also a JavaSchemaRDD.
|
|
val parquetFile = sqlContext.parquetFile("people.parquet")
|
|
|
|
//Parquet files can also be registered as tables and then used in SQL statements.
|
|
parquetFile.registerAsTable("parquetFile")
|
|
val teenagers = sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
|
|
teenagers.collect().foreach(println)
|
|
{% endhighlight %}
|
|
|
|
</div>
|
|
|
|
<div data-lang="java" markdown="1">
|
|
|
|
{% highlight java %}
|
|
|
|
JavaSchemaRDD schemaPeople = ... // The JavaSchemaRDD from the previous example.
|
|
|
|
// JavaSchemaRDDs can be saved as parquet files, maintaining the schema information.
|
|
schemaPeople.saveAsParquetFile("people.parquet");
|
|
|
|
// Read in the parquet file created above. Parquet files are self-describing so the schema is preserved.
|
|
// The result of loading a parquet file is also a JavaSchemaRDD.
|
|
JavaSchemaRDD parquetFile = sqlCtx.parquetFile("people.parquet");
|
|
|
|
//Parquet files can also be registered as tables and then used in SQL statements.
|
|
parquetFile.registerAsTable("parquetFile");
|
|
JavaSchemaRDD teenagers = sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
|
|
|
|
|
|
{% endhighlight %}
|
|
|
|
</div>
|
|
|
|
<div data-lang="python" markdown="1">
|
|
|
|
{% highlight python %}
|
|
|
|
peopleTable # The SchemaRDD from the previous example.
|
|
|
|
# SchemaRDDs can be saved as parquet files, maintaining the schema information.
|
|
peopleTable.saveAsParquetFile("people.parquet")
|
|
|
|
# Read in the parquet file created above. Parquet files are self-describing so the schema is preserved.
|
|
# The result of loading a parquet file is also a SchemaRDD.
|
|
parquetFile = sqlCtx.parquetFile("people.parquet")
|
|
|
|
# Parquet files can also be registered as tables and then used in SQL statements.
|
|
parquetFile.registerAsTable("parquetFile");
|
|
teenagers = sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
|
|
|
|
{% endhighlight %}
|
|
|
|
</div>
|
|
|
|
</div>
|
|
|
|
## Writing Language-Integrated Relational Queries
|
|
|
|
**Language-Integrated queries are currently only supported in Scala.**
|
|
|
|
Spark SQL also supports a domain specific language for writing queries. Once again,
|
|
using the data from the above examples:
|
|
|
|
{% highlight scala %}
|
|
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
|
|
import sqlContext._
|
|
val people: RDD[Person] = ... // An RDD of case class objects, from the first example.
|
|
|
|
// The following is the same as 'SELECT name FROM people WHERE age >= 10 AND age <= 19'
|
|
val teenagers = people.where('age >= 10).where('age <= 19).select('name)
|
|
{% endhighlight %}
|
|
|
|
The DSL uses Scala symbols to represent columns in the underlying table, which are identifiers
|
|
prefixed with a tick (`'`). Implicit conversions turn these symbols into expressions that are
|
|
evaluated by the SQL execution engine. A full list of the functions supported can be found in the
|
|
[ScalaDoc](api/sql/core/index.html#org.apache.spark.sql.SchemaRDD).
|
|
|
|
<!-- TODO: Include the table of operations here. -->
|
|
|
|
# Hive Support
|
|
|
|
Spark SQL also supports reading and writing data stored in [Apache Hive](http://hive.apache.org/).
|
|
However, since Hive has a large number of dependencies, it is not included in the default Spark assembly.
|
|
In order to use Hive you must first run '`SPARK_HIVE=true sbt/sbt assembly/assembly`' (or use `-Phive` for maven).
|
|
This command builds a new assembly jar that includes Hive. Note that this Hive assembly jar must also be present
|
|
on all of the worker nodes, as they will need access to the Hive serialization and deserialization libraries
|
|
(SerDes) in order to acccess data stored in Hive.
|
|
|
|
Configuration of Hive is done by placing your `hive-site.xml` file in `conf/`.
|
|
|
|
<div class="codetabs">
|
|
|
|
<div data-lang="scala" markdown="1">
|
|
|
|
When working with Hive one must construct a `HiveContext`, which inherits from `SQLContext`, and
|
|
adds support for finding tables in in the MetaStore and writing queries using HiveQL. Users who do
|
|
not have an existing Hive deployment can also experiment with the `LocalHiveContext`,
|
|
which is similar to `HiveContext`, but creates a local copy of the `metastore` and `warehouse`
|
|
automatically.
|
|
|
|
{% highlight scala %}
|
|
val sc: SparkContext // An existing SparkContext.
|
|
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
|
|
|
|
// Importing the SQL context gives access to all the public SQL functions and implicit conversions.
|
|
import hiveContext._
|
|
|
|
hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
|
|
hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
|
|
|
|
// Queries are expressed in HiveQL
|
|
hql("FROM src SELECT key, value").collect().foreach(println)
|
|
{% endhighlight %}
|
|
|
|
</div>
|
|
|
|
<div data-lang="java" markdown="1">
|
|
|
|
When working with Hive one must construct a `JavaHiveContext`, which inherits from `JavaSQLContext`, and
|
|
adds support for finding tables in in the MetaStore and writing queries using HiveQL. In addition to
|
|
the `sql` method a `JavaHiveContext` also provides an `hql` methods, which allows queries to be
|
|
expressed in HiveQL.
|
|
|
|
{% highlight java %}
|
|
JavaSparkContext ctx = ...; // An existing JavaSparkContext.
|
|
JavaHiveContext hiveCtx = new org.apache.spark.sql.hive.api.java.HiveContext(ctx);
|
|
|
|
hiveCtx.hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)");
|
|
hiveCtx.hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");
|
|
|
|
// Queries are expressed in HiveQL.
|
|
Row[] results = hiveCtx.hql("FROM src SELECT key, value").collect();
|
|
|
|
{% endhighlight %}
|
|
|
|
</div>
|
|
|
|
<div data-lang="python" markdown="1">
|
|
|
|
When working with Hive one must construct a `HiveContext`, which inherits from `SQLContext`, and
|
|
adds support for finding tables in in the MetaStore and writing queries using HiveQL. In addition to
|
|
the `sql` method a `HiveContext` also provides an `hql` methods, which allows queries to be
|
|
expressed in HiveQL.
|
|
|
|
{% highlight python %}
|
|
|
|
from pyspark.sql import HiveContext
|
|
hiveCtx = HiveContext(sc)
|
|
|
|
hiveCtx.hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
|
|
hiveCtx.hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
|
|
|
|
# Queries can be expressed in HiveQL.
|
|
results = hiveCtx.hql("FROM src SELECT key, value").collect()
|
|
|
|
{% endhighlight %}
|
|
|
|
</div>
|