6fa4ac1b00
Trivial fix. Author: Daoyuan Wang <daoyuan.wang@intel.com> Closes #4400 from adrian-wang/docdate and squashes the following commits: 31bbe40 [Daoyuan Wang] doc fix for date
1633 lines
56 KiB
Markdown
1633 lines
56 KiB
Markdown
---
|
||
layout: global
|
||
displayTitle: Spark SQL Programming Guide
|
||
title: Spark SQL
|
||
---
|
||
|
||
* This will become a table of contents (this text will be scraped).
|
||
{:toc}
|
||
|
||
# Overview
|
||
|
||
<div class="codetabs">
|
||
<div data-lang="scala" markdown="1">
|
||
|
||
Spark SQL allows relational queries expressed in SQL, HiveQL, or Scala to be executed using
|
||
Spark. At the core of this component is a new type of RDD,
|
||
[SchemaRDD](api/scala/index.html#org.apache.spark.sql.SchemaRDD). SchemaRDDs are composed of
|
||
[Row](api/scala/index.html#org.apache.spark.sql.package@Row:org.apache.spark.sql.catalyst.expressions.Row.type) objects, along with
|
||
a schema that describes the data types of each column in the row. A SchemaRDD is similar to a table
|
||
in a traditional relational database. A SchemaRDD can be created from an existing RDD, a [Parquet](http://parquet.io)
|
||
file, a JSON dataset, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/).
|
||
|
||
All of the examples on this page use sample data included in the Spark distribution and can be run in the `spark-shell`.
|
||
|
||
</div>
|
||
|
||
<div data-lang="java" markdown="1">
|
||
Spark SQL allows relational queries expressed in SQL or HiveQL to be executed using
|
||
Spark. At the core of this component is a new type of RDD,
|
||
[JavaSchemaRDD](api/scala/index.html#org.apache.spark.sql.api.java.JavaSchemaRDD). JavaSchemaRDDs are composed of
|
||
[Row](api/scala/index.html#org.apache.spark.sql.api.java.Row) objects, along with
|
||
a schema that describes the data types of each column in the row. A JavaSchemaRDD is similar to a table
|
||
in a traditional relational database. A JavaSchemaRDD can be created from an existing RDD, a [Parquet](http://parquet.io)
|
||
file, a JSON dataset, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/).
|
||
</div>
|
||
|
||
<div data-lang="python" markdown="1">
|
||
|
||
Spark SQL allows relational queries expressed in SQL or HiveQL to be executed using
|
||
Spark. At the core of this component is a new type of RDD,
|
||
[SchemaRDD](api/python/pyspark.sql.SchemaRDD-class.html). SchemaRDDs are composed of
|
||
[Row](api/python/pyspark.sql.Row-class.html) objects, along with
|
||
a schema that describes the data types of each column in the row. A SchemaRDD is similar to a table
|
||
in a traditional relational database. A SchemaRDD can be created from an existing RDD, a [Parquet](http://parquet.io)
|
||
file, a JSON dataset, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/).
|
||
|
||
All of the examples on this page use sample data included in the Spark distribution and can be run in the `pyspark` shell.
|
||
</div>
|
||
</div>
|
||
|
||
**Spark SQL is currently an alpha component. While we will minimize API changes, some APIs may change in future releases.**
|
||
|
||
***************************************************************************************************
|
||
|
||
# Getting Started
|
||
|
||
<div class="codetabs">
|
||
<div data-lang="scala" markdown="1">
|
||
|
||
The entry point into all relational functionality in Spark is the
|
||
[SQLContext](api/scala/index.html#org.apache.spark.sql.SQLContext) class, or one of its
|
||
descendants. To create a basic SQLContext, all you need is a SparkContext.
|
||
|
||
{% highlight scala %}
|
||
val sc: SparkContext // An existing SparkContext.
|
||
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
|
||
|
||
// createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD.
|
||
import sqlContext.createSchemaRDD
|
||
{% endhighlight %}
|
||
|
||
In addition to the basic SQLContext, you can also create a HiveContext, which provides a
|
||
superset of the functionality provided by the basic SQLContext. Additional features include
|
||
the ability to write queries using the more complete HiveQL parser, access to HiveUDFs, and the
|
||
ability to read data from Hive tables. To use a HiveContext, you do not need to have an
|
||
existing Hive setup, and all of the data sources available to a SQLContext are still available.
|
||
HiveContext is only packaged separately to avoid including all of Hive's dependencies in the default
|
||
Spark build. If these dependencies are not a problem for your application then using HiveContext
|
||
is recommended for the 1.2 release of Spark. Future releases will focus on bringing SQLContext up to
|
||
feature parity with a HiveContext.
|
||
|
||
</div>
|
||
|
||
<div data-lang="java" markdown="1">
|
||
|
||
The entry point into all relational functionality in Spark is the
|
||
[JavaSQLContext](api/scala/index.html#org.apache.spark.sql.api.java.JavaSQLContext) class, or one
|
||
of its descendants. To create a basic JavaSQLContext, all you need is a JavaSparkContext.
|
||
|
||
{% highlight java %}
|
||
JavaSparkContext sc = ...; // An existing JavaSparkContext.
|
||
JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(sc);
|
||
{% endhighlight %}
|
||
|
||
In addition to the basic SQLContext, you can also create a HiveContext, which provides a strict
|
||
super set of the functionality provided by the basic SQLContext. Additional features include
|
||
the ability to write queries using the more complete HiveQL parser, access to HiveUDFs, and the
|
||
ability to read data from Hive tables. To use a HiveContext, you do not need to have an
|
||
existing Hive setup, and all of the data sources available to a SQLContext are still available.
|
||
HiveContext is only packaged separately to avoid including all of Hive's dependencies in the default
|
||
Spark build. If these dependencies are not a problem for your application then using HiveContext
|
||
is recommended for the 1.2 release of Spark. Future releases will focus on bringing SQLContext up to
|
||
feature parity with a HiveContext.
|
||
|
||
</div>
|
||
|
||
<div data-lang="python" markdown="1">
|
||
|
||
The entry point into all relational functionality in Spark is the
|
||
[SQLContext](api/python/pyspark.sql.SQLContext-class.html) class, or one
|
||
of its decedents. To create a basic SQLContext, all you need is a SparkContext.
|
||
|
||
{% highlight python %}
|
||
from pyspark.sql import SQLContext
|
||
sqlContext = SQLContext(sc)
|
||
{% endhighlight %}
|
||
|
||
In addition to the basic SQLContext, you can also create a HiveContext, which provides a strict
|
||
super set of the functionality provided by the basic SQLContext. Additional features include
|
||
the ability to write queries using the more complete HiveQL parser, access to HiveUDFs, and the
|
||
ability to read data from Hive tables. To use a HiveContext, you do not need to have an
|
||
existing Hive setup, and all of the data sources available to a SQLContext are still available.
|
||
HiveContext is only packaged separately to avoid including all of Hive's dependencies in the default
|
||
Spark build. If these dependencies are not a problem for your application then using HiveContext
|
||
is recommended for the 1.2 release of Spark. Future releases will focus on bringing SQLContext up to
|
||
feature parity with a HiveContext.
|
||
|
||
</div>
|
||
|
||
</div>
|
||
|
||
The specific variant of SQL that is used to parse queries can also be selected using the
|
||
`spark.sql.dialect` option. This parameter can be changed using either the `setConf` method on
|
||
a SQLContext or by using a `SET key=value` command in SQL. For a SQLContext, the only dialect
|
||
available is "sql" which uses a simple SQL parser provided by Spark SQL. In a HiveContext, the
|
||
default is "hiveql", though "sql" is also available. Since the HiveQL parser is much more complete,
|
||
this is recommended for most use cases.
|
||
|
||
# Data Sources
|
||
|
||
Spark SQL supports operating on a variety of data sources through the `SchemaRDD` interface.
|
||
A SchemaRDD can be operated on as normal RDDs and can also be registered as a temporary table.
|
||
Registering a SchemaRDD as a table allows you to run SQL queries over its data. This section
|
||
describes the various methods for loading data into a SchemaRDD.
|
||
|
||
## RDDs
|
||
|
||
Spark SQL supports two different methods for converting existing RDDs into SchemaRDDs. The first
|
||
method uses reflection to infer the schema of an RDD that contains specific types of objects. This
|
||
reflection based approach leads to more concise code and works well when you already know the schema
|
||
while writing your Spark application.
|
||
|
||
The second method for creating SchemaRDDs is through a programmatic interface that allows you to
|
||
construct a schema and then apply it to an existing RDD. While this method is more verbose, it allows
|
||
you to construct SchemaRDDs when the columns and their types are not known until runtime.
|
||
|
||
### Inferring the Schema Using Reflection
|
||
<div class="codetabs">
|
||
|
||
<div data-lang="scala" markdown="1">
|
||
|
||
The Scala interaface for Spark SQL supports automatically converting an RDD containing case classes
|
||
to a SchemaRDD. The case class
|
||
defines the schema of the table. The names of the arguments to the case class are read using
|
||
reflection and become the names of the columns. Case classes can also be nested or contain complex
|
||
types such as Sequences or Arrays. This RDD can be implicitly converted to a SchemaRDD and then be
|
||
registered as a table. Tables can be used in subsequent SQL statements.
|
||
|
||
{% highlight scala %}
|
||
// sc is an existing SparkContext.
|
||
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
|
||
// createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD.
|
||
import sqlContext.createSchemaRDD
|
||
|
||
// Define the schema using a case class.
|
||
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
|
||
// you can use custom classes that implement the Product interface.
|
||
case class Person(name: String, age: Int)
|
||
|
||
// Create an RDD of Person objects and register it as a table.
|
||
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))
|
||
people.registerTempTable("people")
|
||
|
||
// SQL statements can be run by using the sql methods provided by sqlContext.
|
||
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
|
||
|
||
// The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
|
||
// The columns of a row in the result can be accessed by ordinal.
|
||
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
|
||
{% endhighlight %}
|
||
|
||
</div>
|
||
|
||
<div data-lang="java" markdown="1">
|
||
|
||
Spark SQL supports automatically converting an RDD of [JavaBeans](http://stackoverflow.com/questions/3295496/what-is-a-javabean-exactly)
|
||
into a Schema RDD. The BeanInfo, obtained using reflection, defines the schema of the table.
|
||
Currently, Spark SQL does not support JavaBeans that contain
|
||
nested or contain complex types such as Lists or Arrays. You can create a JavaBean by creating a
|
||
class that implements Serializable and has getters and setters for all of its fields.
|
||
|
||
{% highlight java %}
|
||
|
||
public static class Person implements Serializable {
|
||
private String name;
|
||
private int age;
|
||
|
||
public String getName() {
|
||
return name;
|
||
}
|
||
|
||
public void setName(String name) {
|
||
this.name = name;
|
||
}
|
||
|
||
public int getAge() {
|
||
return age;
|
||
}
|
||
|
||
public void setAge(int age) {
|
||
this.age = age;
|
||
}
|
||
}
|
||
|
||
{% endhighlight %}
|
||
|
||
|
||
A schema can be applied to an existing RDD by calling `applySchema` and providing the Class object
|
||
for the JavaBean.
|
||
|
||
{% highlight java %}
|
||
// sc is an existing JavaSparkContext.
|
||
JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(sc);
|
||
|
||
// Load a text file and convert each line to a JavaBean.
|
||
JavaRDD<Person> people = sc.textFile("examples/src/main/resources/people.txt").map(
|
||
new Function<String, Person>() {
|
||
public Person call(String line) throws Exception {
|
||
String[] parts = line.split(",");
|
||
|
||
Person person = new Person();
|
||
person.setName(parts[0]);
|
||
person.setAge(Integer.parseInt(parts[1].trim()));
|
||
|
||
return person;
|
||
}
|
||
});
|
||
|
||
// Apply a schema to an RDD of JavaBeans and register it as a table.
|
||
JavaSchemaRDD schemaPeople = sqlContext.applySchema(people, Person.class);
|
||
schemaPeople.registerTempTable("people");
|
||
|
||
// SQL can be run over RDDs that have been registered as tables.
|
||
JavaSchemaRDD teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
|
||
|
||
// The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
|
||
// The columns of a row in the result can be accessed by ordinal.
|
||
List<String> teenagerNames = teenagers.map(new Function<Row, String>() {
|
||
public String call(Row row) {
|
||
return "Name: " + row.getString(0);
|
||
}
|
||
}).collect();
|
||
|
||
{% endhighlight %}
|
||
|
||
</div>
|
||
|
||
<div data-lang="python" markdown="1">
|
||
|
||
Spark SQL can convert an RDD of Row objects to a SchemaRDD, inferring the datatypes. Rows are constructed by passing a list of
|
||
key/value pairs as kwargs to the Row class. The keys of this list define the column names of the table,
|
||
and the types are inferred by looking at the first row. Since we currently only look at the first
|
||
row, it is important that there is no missing data in the first row of the RDD. In future versions we
|
||
plan to more completely infer the schema by looking at more data, similar to the inference that is
|
||
performed on JSON files.
|
||
|
||
{% highlight python %}
|
||
# sc is an existing SparkContext.
|
||
from pyspark.sql import SQLContext, Row
|
||
sqlContext = SQLContext(sc)
|
||
|
||
# Load a text file and convert each line to a Row.
|
||
lines = sc.textFile("examples/src/main/resources/people.txt")
|
||
parts = lines.map(lambda l: l.split(","))
|
||
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
|
||
|
||
# Infer the schema, and register the SchemaRDD as a table.
|
||
schemaPeople = sqlContext.inferSchema(people)
|
||
schemaPeople.registerTempTable("people")
|
||
|
||
# SQL can be run over SchemaRDDs that have been registered as a table.
|
||
teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
|
||
|
||
# The results of SQL queries are RDDs and support all the normal RDD operations.
|
||
teenNames = teenagers.map(lambda p: "Name: " + p.name)
|
||
for teenName in teenNames.collect():
|
||
print teenName
|
||
{% endhighlight %}
|
||
|
||
</div>
|
||
|
||
</div>
|
||
|
||
### Programmatically Specifying the Schema
|
||
|
||
<div class="codetabs">
|
||
|
||
<div data-lang="scala" markdown="1">
|
||
|
||
When case classes cannot be defined ahead of time (for example,
|
||
the structure of records is encoded in a string, or a text dataset will be parsed
|
||
and fields will be projected differently for different users),
|
||
a `SchemaRDD` can be created programmatically with three steps.
|
||
|
||
1. Create an RDD of `Row`s from the original RDD;
|
||
2. Create the schema represented by a `StructType` matching the structure of
|
||
`Row`s in the RDD created in Step 1.
|
||
3. Apply the schema to the RDD of `Row`s via `applySchema` method provided
|
||
by `SQLContext`.
|
||
|
||
For example:
|
||
{% highlight scala %}
|
||
// sc is an existing SparkContext.
|
||
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
|
||
|
||
// Create an RDD
|
||
val people = sc.textFile("examples/src/main/resources/people.txt")
|
||
|
||
// The schema is encoded in a string
|
||
val schemaString = "name age"
|
||
|
||
// Import Spark SQL data types and Row.
|
||
import org.apache.spark.sql._
|
||
|
||
// Generate the schema based on the string of schema
|
||
val schema =
|
||
StructType(
|
||
schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
|
||
|
||
// Convert records of the RDD (people) to Rows.
|
||
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))
|
||
|
||
// Apply the schema to the RDD.
|
||
val peopleSchemaRDD = sqlContext.applySchema(rowRDD, schema)
|
||
|
||
// Register the SchemaRDD as a table.
|
||
peopleSchemaRDD.registerTempTable("people")
|
||
|
||
// SQL statements can be run by using the sql methods provided by sqlContext.
|
||
val results = sqlContext.sql("SELECT name FROM people")
|
||
|
||
// The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
|
||
// The columns of a row in the result can be accessed by ordinal.
|
||
results.map(t => "Name: " + t(0)).collect().foreach(println)
|
||
{% endhighlight %}
|
||
|
||
|
||
</div>
|
||
|
||
<div data-lang="java" markdown="1">
|
||
|
||
When JavaBean classes cannot be defined ahead of time (for example,
|
||
the structure of records is encoded in a string, or a text dataset will be parsed and
|
||
fields will be projected differently for different users),
|
||
a `SchemaRDD` can be created programmatically with three steps.
|
||
|
||
1. Create an RDD of `Row`s from the original RDD;
|
||
2. Create the schema represented by a `StructType` matching the structure of
|
||
`Row`s in the RDD created in Step 1.
|
||
3. Apply the schema to the RDD of `Row`s via `applySchema` method provided
|
||
by `JavaSQLContext`.
|
||
|
||
For example:
|
||
{% highlight java %}
|
||
// Import factory methods provided by DataType.
|
||
import org.apache.spark.sql.api.java.DataType
|
||
// Import StructType and StructField
|
||
import org.apache.spark.sql.api.java.StructType
|
||
import org.apache.spark.sql.api.java.StructField
|
||
// Import Row.
|
||
import org.apache.spark.sql.api.java.Row
|
||
|
||
// sc is an existing JavaSparkContext.
|
||
JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(sc);
|
||
|
||
// Load a text file and convert each line to a JavaBean.
|
||
JavaRDD<String> people = sc.textFile("examples/src/main/resources/people.txt");
|
||
|
||
// The schema is encoded in a string
|
||
String schemaString = "name age";
|
||
|
||
// Generate the schema based on the string of schema
|
||
List<StructField> fields = new ArrayList<StructField>();
|
||
for (String fieldName: schemaString.split(" ")) {
|
||
fields.add(DataType.createStructField(fieldName, DataType.StringType, true));
|
||
}
|
||
StructType schema = DataType.createStructType(fields);
|
||
|
||
// Convert records of the RDD (people) to Rows.
|
||
JavaRDD<Row> rowRDD = people.map(
|
||
new Function<String, Row>() {
|
||
public Row call(String record) throws Exception {
|
||
String[] fields = record.split(",");
|
||
return Row.create(fields[0], fields[1].trim());
|
||
}
|
||
});
|
||
|
||
// Apply the schema to the RDD.
|
||
JavaSchemaRDD peopleSchemaRDD = sqlContext.applySchema(rowRDD, schema);
|
||
|
||
// Register the SchemaRDD as a table.
|
||
peopleSchemaRDD.registerTempTable("people");
|
||
|
||
// SQL can be run over RDDs that have been registered as tables.
|
||
JavaSchemaRDD results = sqlContext.sql("SELECT name FROM people");
|
||
|
||
// The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
|
||
// The columns of a row in the result can be accessed by ordinal.
|
||
List<String> names = results.map(new Function<Row, String>() {
|
||
public String call(Row row) {
|
||
return "Name: " + row.getString(0);
|
||
}
|
||
}).collect();
|
||
|
||
{% endhighlight %}
|
||
|
||
</div>
|
||
|
||
<div data-lang="python" markdown="1">
|
||
|
||
When a dictionary of kwargs cannot be defined ahead of time (for example,
|
||
the structure of records is encoded in a string, or a text dataset will be parsed and
|
||
fields will be projected differently for different users),
|
||
a `SchemaRDD` can be created programmatically with three steps.
|
||
|
||
1. Create an RDD of tuples or lists from the original RDD;
|
||
2. Create the schema represented by a `StructType` matching the structure of
|
||
tuples or lists in the RDD created in the step 1.
|
||
3. Apply the schema to the RDD via `applySchema` method provided by `SQLContext`.
|
||
|
||
For example:
|
||
{% highlight python %}
|
||
# Import SQLContext and data types
|
||
from pyspark.sql import *
|
||
|
||
# sc is an existing SparkContext.
|
||
sqlContext = SQLContext(sc)
|
||
|
||
# Load a text file and convert each line to a tuple.
|
||
lines = sc.textFile("examples/src/main/resources/people.txt")
|
||
parts = lines.map(lambda l: l.split(","))
|
||
people = parts.map(lambda p: (p[0], p[1].strip()))
|
||
|
||
# The schema is encoded in a string.
|
||
schemaString = "name age"
|
||
|
||
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
|
||
schema = StructType(fields)
|
||
|
||
# Apply the schema to the RDD.
|
||
schemaPeople = sqlContext.applySchema(people, schema)
|
||
|
||
# Register the SchemaRDD as a table.
|
||
schemaPeople.registerTempTable("people")
|
||
|
||
# SQL can be run over SchemaRDDs that have been registered as a table.
|
||
results = sqlContext.sql("SELECT name FROM people")
|
||
|
||
# The results of SQL queries are RDDs and support all the normal RDD operations.
|
||
names = results.map(lambda p: "Name: " + p.name)
|
||
for name in names.collect():
|
||
print name
|
||
{% endhighlight %}
|
||
|
||
|
||
</div>
|
||
|
||
</div>
|
||
|
||
## Parquet Files
|
||
|
||
[Parquet](http://parquet.io) is a columnar format that is supported by many other data processing systems.
|
||
Spark SQL provides support for both reading and writing Parquet files that automatically preserves the schema
|
||
of the original data.
|
||
|
||
### Loading Data Programmatically
|
||
|
||
Using the data from the above example:
|
||
|
||
<div class="codetabs">
|
||
|
||
<div data-lang="scala" markdown="1">
|
||
|
||
{% highlight scala %}
|
||
// sqlContext from the previous example is used in this example.
|
||
// createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD.
|
||
import sqlContext.createSchemaRDD
|
||
|
||
val people: RDD[Person] = ... // An RDD of case class objects, from the previous example.
|
||
|
||
// The RDD is implicitly converted to a SchemaRDD by createSchemaRDD, allowing it to be stored using Parquet.
|
||
people.saveAsParquetFile("people.parquet")
|
||
|
||
// Read in the parquet file created above. Parquet files are self-describing so the schema is preserved.
|
||
// The result of loading a Parquet file is also a SchemaRDD.
|
||
val parquetFile = sqlContext.parquetFile("people.parquet")
|
||
|
||
//Parquet files can also be registered as tables and then used in SQL statements.
|
||
parquetFile.registerTempTable("parquetFile")
|
||
val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
|
||
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
|
||
{% endhighlight %}
|
||
|
||
</div>
|
||
|
||
<div data-lang="java" markdown="1">
|
||
|
||
{% highlight java %}
|
||
// sqlContext from the previous example is used in this example.
|
||
|
||
JavaSchemaRDD schemaPeople = ... // The JavaSchemaRDD from the previous example.
|
||
|
||
// JavaSchemaRDDs can be saved as Parquet files, maintaining the schema information.
|
||
schemaPeople.saveAsParquetFile("people.parquet");
|
||
|
||
// Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved.
|
||
// The result of loading a parquet file is also a JavaSchemaRDD.
|
||
JavaSchemaRDD parquetFile = sqlContext.parquetFile("people.parquet");
|
||
|
||
//Parquet files can also be registered as tables and then used in SQL statements.
|
||
parquetFile.registerTempTable("parquetFile");
|
||
JavaSchemaRDD teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
|
||
List<String> teenagerNames = teenagers.map(new Function<Row, String>() {
|
||
public String call(Row row) {
|
||
return "Name: " + row.getString(0);
|
||
}
|
||
}).collect();
|
||
{% endhighlight %}
|
||
|
||
</div>
|
||
|
||
<div data-lang="python" markdown="1">
|
||
|
||
{% highlight python %}
|
||
# sqlContext from the previous example is used in this example.
|
||
|
||
schemaPeople # The SchemaRDD from the previous example.
|
||
|
||
# SchemaRDDs can be saved as Parquet files, maintaining the schema information.
|
||
schemaPeople.saveAsParquetFile("people.parquet")
|
||
|
||
# Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved.
|
||
# The result of loading a parquet file is also a SchemaRDD.
|
||
parquetFile = sqlContext.parquetFile("people.parquet")
|
||
|
||
# Parquet files can also be registered as tables and then used in SQL statements.
|
||
parquetFile.registerTempTable("parquetFile");
|
||
teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
|
||
teenNames = teenagers.map(lambda p: "Name: " + p.name)
|
||
for teenName in teenNames.collect():
|
||
print teenName
|
||
{% endhighlight %}
|
||
|
||
</div>
|
||
|
||
</div>
|
||
|
||
### Configuration
|
||
|
||
Configuration of Parquet can be done using the `setConf` method on SQLContext or by running
|
||
`SET key=value` commands using SQL.
|
||
|
||
<table class="table">
|
||
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
|
||
<tr>
|
||
<td><code>spark.sql.parquet.binaryAsString</code></td>
|
||
<td>false</td>
|
||
<td>
|
||
Some other Parquet-producing systems, in particular Impala and older versions of Spark SQL, do
|
||
not differentiate between binary data and strings when writing out the Parquet schema. This
|
||
flag tells Spark SQL to interpret binary data as a string to provide compatibility with these systems.
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td><code>spark.sql.parquet.int96AsTimestamp</code></td>
|
||
<td>true</td>
|
||
<td>
|
||
Some Parquet-producing systems, in particular Impala, store Timestamp into INT96. Spark would also
|
||
store Timestamp as INT96 because we need to avoid precision lost of the nanoseconds field. This
|
||
flag tells Spark SQL to interpret INT96 data as a timestamp to provide compatibility with these systems.
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td><code>spark.sql.parquet.cacheMetadata</code></td>
|
||
<td>true</td>
|
||
<td>
|
||
Turns on caching of Parquet schema metadata. Can speed up querying of static data.
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td><code>spark.sql.parquet.compression.codec</code></td>
|
||
<td>gzip</td>
|
||
<td>
|
||
Sets the compression codec use when writing Parquet files. Acceptable values include:
|
||
uncompressed, snappy, gzip, lzo.
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td><code>spark.sql.parquet.filterPushdown</code></td>
|
||
<td>false</td>
|
||
<td>
|
||
Turn on Parquet filter pushdown optimization. This feature is turned off by default because of a known
|
||
bug in Paruet 1.6.0rc3 (<a href="https://issues.apache.org/jira/browse/PARQUET-136">PARQUET-136</a>).
|
||
However, if your table doesn't contain any nullable string or binary columns, it's still safe to turn
|
||
this feature on.
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td><code>spark.sql.hive.convertMetastoreParquet</code></td>
|
||
<td>true</td>
|
||
<td>
|
||
When set to false, Spark SQL will use the Hive SerDe for parquet tables instead of the built in
|
||
support.
|
||
</td>
|
||
</tr>
|
||
</table>
|
||
|
||
## JSON Datasets
|
||
<div class="codetabs">
|
||
|
||
<div data-lang="scala" markdown="1">
|
||
Spark SQL can automatically infer the schema of a JSON dataset and load it as a SchemaRDD.
|
||
This conversion can be done using one of two methods in a SQLContext:
|
||
|
||
* `jsonFile` - loads data from a directory of JSON files where each line of the files is a JSON object.
|
||
* `jsonRDD` - loads data from an existing RDD where each element of the RDD is a string containing a JSON object.
|
||
|
||
Note that the file that is offered as _jsonFile_ is not a typical JSON file. Each
|
||
line must contain a separate, self-contained valid JSON object. As a consequence,
|
||
a regular multi-line JSON file will most often fail.
|
||
|
||
{% highlight scala %}
|
||
// sc is an existing SparkContext.
|
||
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
|
||
|
||
// A JSON dataset is pointed to by path.
|
||
// The path can be either a single text file or a directory storing text files.
|
||
val path = "examples/src/main/resources/people.json"
|
||
// Create a SchemaRDD from the file(s) pointed to by path
|
||
val people = sqlContext.jsonFile(path)
|
||
|
||
// The inferred schema can be visualized using the printSchema() method.
|
||
people.printSchema()
|
||
// root
|
||
// |-- age: integer (nullable = true)
|
||
// |-- name: string (nullable = true)
|
||
|
||
// Register this SchemaRDD as a table.
|
||
people.registerTempTable("people")
|
||
|
||
// SQL statements can be run by using the sql methods provided by sqlContext.
|
||
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
|
||
|
||
// Alternatively, a SchemaRDD can be created for a JSON dataset represented by
|
||
// an RDD[String] storing one JSON object per string.
|
||
val anotherPeopleRDD = sc.parallelize(
|
||
"""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
|
||
val anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD)
|
||
{% endhighlight %}
|
||
|
||
</div>
|
||
|
||
<div data-lang="java" markdown="1">
|
||
Spark SQL can automatically infer the schema of a JSON dataset and load it as a JavaSchemaRDD.
|
||
This conversion can be done using one of two methods in a JavaSQLContext :
|
||
|
||
* `jsonFile` - loads data from a directory of JSON files where each line of the files is a JSON object.
|
||
* `jsonRDD` - loads data from an existing RDD where each element of the RDD is a string containing a JSON object.
|
||
|
||
Note that the file that is offered as _jsonFile_ is not a typical JSON file. Each
|
||
line must contain a separate, self-contained valid JSON object. As a consequence,
|
||
a regular multi-line JSON file will most often fail.
|
||
|
||
{% highlight java %}
|
||
// sc is an existing JavaSparkContext.
|
||
JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(sc);
|
||
|
||
// A JSON dataset is pointed to by path.
|
||
// The path can be either a single text file or a directory storing text files.
|
||
String path = "examples/src/main/resources/people.json";
|
||
// Create a JavaSchemaRDD from the file(s) pointed to by path
|
||
JavaSchemaRDD people = sqlContext.jsonFile(path);
|
||
|
||
// The inferred schema can be visualized using the printSchema() method.
|
||
people.printSchema();
|
||
// root
|
||
// |-- age: integer (nullable = true)
|
||
// |-- name: string (nullable = true)
|
||
|
||
// Register this JavaSchemaRDD as a table.
|
||
people.registerTempTable("people");
|
||
|
||
// SQL statements can be run by using the sql methods provided by sqlContext.
|
||
JavaSchemaRDD teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
|
||
|
||
// Alternatively, a JavaSchemaRDD can be created for a JSON dataset represented by
|
||
// an RDD[String] storing one JSON object per string.
|
||
List<String> jsonData = Arrays.asList(
|
||
"{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
|
||
JavaRDD<String> anotherPeopleRDD = sc.parallelize(jsonData);
|
||
JavaSchemaRDD anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD);
|
||
{% endhighlight %}
|
||
</div>
|
||
|
||
<div data-lang="python" markdown="1">
|
||
Spark SQL can automatically infer the schema of a JSON dataset and load it as a SchemaRDD.
|
||
This conversion can be done using one of two methods in a SQLContext:
|
||
|
||
* `jsonFile` - loads data from a directory of JSON files where each line of the files is a JSON object.
|
||
* `jsonRDD` - loads data from an existing RDD where each element of the RDD is a string containing a JSON object.
|
||
|
||
Note that the file that is offered as _jsonFile_ is not a typical JSON file. Each
|
||
line must contain a separate, self-contained valid JSON object. As a consequence,
|
||
a regular multi-line JSON file will most often fail.
|
||
|
||
{% highlight python %}
|
||
# sc is an existing SparkContext.
|
||
from pyspark.sql import SQLContext
|
||
sqlContext = SQLContext(sc)
|
||
|
||
# A JSON dataset is pointed to by path.
|
||
# The path can be either a single text file or a directory storing text files.
|
||
path = "examples/src/main/resources/people.json"
|
||
# Create a SchemaRDD from the file(s) pointed to by path
|
||
people = sqlContext.jsonFile(path)
|
||
|
||
# The inferred schema can be visualized using the printSchema() method.
|
||
people.printSchema()
|
||
# root
|
||
# |-- age: integer (nullable = true)
|
||
# |-- name: string (nullable = true)
|
||
|
||
# Register this SchemaRDD as a table.
|
||
people.registerTempTable("people")
|
||
|
||
# SQL statements can be run by using the sql methods provided by sqlContext.
|
||
teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
|
||
|
||
# Alternatively, a SchemaRDD can be created for a JSON dataset represented by
|
||
# an RDD[String] storing one JSON object per string.
|
||
anotherPeopleRDD = sc.parallelize([
|
||
'{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}'])
|
||
anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD)
|
||
{% endhighlight %}
|
||
</div>
|
||
|
||
</div>
|
||
|
||
## Hive Tables
|
||
|
||
Spark SQL also supports reading and writing data stored in [Apache Hive](http://hive.apache.org/).
|
||
However, since Hive has a large number of dependencies, it is not included in the default Spark assembly.
|
||
Hive support is enabled by adding the `-Phive` and `-Phive-thriftserver` flags to Spark's build.
|
||
This command builds a new assembly jar that includes Hive. Note that this Hive assembly jar must also be present
|
||
on all of the worker nodes, as they will need access to the Hive serialization and deserialization libraries
|
||
(SerDes) in order to access data stored in Hive.
|
||
|
||
Configuration of Hive is done by placing your `hive-site.xml` file in `conf/`.
|
||
|
||
<div class="codetabs">
|
||
|
||
<div data-lang="scala" markdown="1">
|
||
|
||
When working with Hive one must construct a `HiveContext`, which inherits from `SQLContext`, and
|
||
adds support for finding tables in the MetaStore and writing queries using HiveQL. Users who do
|
||
not have an existing Hive deployment can still create a HiveContext. When not configured by the
|
||
hive-site.xml, the context automatically creates `metastore_db` and `warehouse` in the current
|
||
directory.
|
||
|
||
{% highlight scala %}
|
||
// sc is an existing SparkContext.
|
||
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
|
||
|
||
sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
|
||
sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
|
||
|
||
// Queries are expressed in HiveQL
|
||
sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)
|
||
{% endhighlight %}
|
||
|
||
</div>
|
||
|
||
<div data-lang="java" markdown="1">
|
||
|
||
When working with Hive one must construct a `JavaHiveContext`, which inherits from `JavaSQLContext`, and
|
||
adds support for finding tables in the MetaStore and writing queries using HiveQL. In addition to
|
||
the `sql` method a `JavaHiveContext` also provides an `hql` methods, which allows queries to be
|
||
expressed in HiveQL.
|
||
|
||
{% highlight java %}
|
||
// sc is an existing JavaSparkContext.
|
||
JavaHiveContext sqlContext = new org.apache.spark.sql.hive.api.java.HiveContext(sc);
|
||
|
||
sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)");
|
||
sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");
|
||
|
||
// Queries are expressed in HiveQL.
|
||
Row[] results = sqlContext.sql("FROM src SELECT key, value").collect();
|
||
|
||
{% endhighlight %}
|
||
|
||
</div>
|
||
|
||
<div data-lang="python" markdown="1">
|
||
|
||
When working with Hive one must construct a `HiveContext`, which inherits from `SQLContext`, and
|
||
adds support for finding tables in the MetaStore and writing queries using HiveQL. In addition to
|
||
the `sql` method a `HiveContext` also provides an `hql` methods, which allows queries to be
|
||
expressed in HiveQL.
|
||
|
||
{% highlight python %}
|
||
# sc is an existing SparkContext.
|
||
from pyspark.sql import HiveContext
|
||
sqlContext = HiveContext(sc)
|
||
|
||
sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
|
||
sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
|
||
|
||
# Queries can be expressed in HiveQL.
|
||
results = sqlContext.sql("FROM src SELECT key, value").collect()
|
||
|
||
{% endhighlight %}
|
||
|
||
</div>
|
||
</div>
|
||
|
||
# Performance Tuning
|
||
|
||
For some workloads it is possible to improve performance by either caching data in memory, or by
|
||
turning on some experimental options.
|
||
|
||
## Caching Data In Memory
|
||
|
||
Spark SQL can cache tables using an in-memory columnar format by calling `sqlContext.cacheTable("tableName")` or `schemaRDD.cache()`.
|
||
Then Spark SQL will scan only required columns and will automatically tune compression to minimize
|
||
memory usage and GC pressure. You can call `sqlContext.uncacheTable("tableName")` to remove the table from memory.
|
||
|
||
Configuration of in-memory caching can be done using the `setConf` method on SQLContext or by running
|
||
`SET key=value` commands using SQL.
|
||
|
||
<table class="table">
|
||
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
|
||
<tr>
|
||
<td><code>spark.sql.inMemoryColumnarStorage.compressed</code></td>
|
||
<td>true</td>
|
||
<td>
|
||
When set to true Spark SQL will automatically select a compression codec for each column based
|
||
on statistics of the data.
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td><code>spark.sql.inMemoryColumnarStorage.batchSize</code></td>
|
||
<td>10000</td>
|
||
<td>
|
||
Controls the size of batches for columnar caching. Larger batch sizes can improve memory utilization
|
||
and compression, but risk OOMs when caching data.
|
||
</td>
|
||
</tr>
|
||
|
||
</table>
|
||
|
||
## Other Configuration Options
|
||
|
||
The following options can also be used to tune the performance of query execution. It is possible
|
||
that these options will be deprecated in future release as more optimizations are performed automatically.
|
||
|
||
<table class="table">
|
||
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
|
||
<tr>
|
||
<td><code>spark.sql.autoBroadcastJoinThreshold</code></td>
|
||
<td>10485760 (10 MB)</td>
|
||
<td>
|
||
Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when
|
||
performing a join. By setting this value to -1 broadcasting can be disabled. Note that currently
|
||
statistics are only supported for Hive Metastore tables where the command
|
||
`ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan` has been run.
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td><code>spark.sql.codegen</code></td>
|
||
<td>false</td>
|
||
<td>
|
||
When true, code will be dynamically generated at runtime for expression evaluation in a specific
|
||
query. For some queries with complicated expression this option can lead to significant speed-ups.
|
||
However, for simple queries this can actually slow down query execution.
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td><code>spark.sql.shuffle.partitions</code></td>
|
||
<td>200</td>
|
||
<td>
|
||
Configures the number of partitions to use when shuffling data for joins or aggregations.
|
||
</td>
|
||
</tr>
|
||
</table>
|
||
|
||
# Other SQL Interfaces
|
||
|
||
Spark SQL also supports interfaces for running SQL queries directly without the need to write any
|
||
code.
|
||
|
||
## Running the Thrift JDBC/ODBC server
|
||
|
||
The Thrift JDBC/ODBC server implemented here corresponds to the [`HiveServer2`](https://cwiki.apache.org/confluence/display/Hive/Setting+Up+HiveServer2)
|
||
in Hive 0.12. You can test the JDBC server with the beeline script that comes with either Spark or Hive 0.12.
|
||
|
||
To start the JDBC/ODBC server, run the following in the Spark directory:
|
||
|
||
./sbin/start-thriftserver.sh
|
||
|
||
This script accepts all `bin/spark-submit` command line options, plus a `--hiveconf` option to
|
||
specify Hive properties. You may run `./sbin/start-thriftserver.sh --help` for a complete list of
|
||
all available options. By default, the server listens on localhost:10000. You may override this
|
||
bahaviour via either environment variables, i.e.:
|
||
|
||
{% highlight bash %}
|
||
export HIVE_SERVER2_THRIFT_PORT=<listening-port>
|
||
export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>
|
||
./sbin/start-thriftserver.sh \
|
||
--master <master-uri> \
|
||
...
|
||
{% endhighlight %}
|
||
|
||
or system properties:
|
||
|
||
{% highlight bash %}
|
||
./sbin/start-thriftserver.sh \
|
||
--hiveconf hive.server2.thrift.port=<listening-port> \
|
||
--hiveconf hive.server2.thrift.bind.host=<listening-host> \
|
||
--master <master-uri>
|
||
...
|
||
{% endhighlight %}
|
||
|
||
Now you can use beeline to test the Thrift JDBC/ODBC server:
|
||
|
||
./bin/beeline
|
||
|
||
Connect to the JDBC/ODBC server in beeline with:
|
||
|
||
beeline> !connect jdbc:hive2://localhost:10000
|
||
|
||
Beeline will ask you for a username and password. In non-secure mode, simply enter the username on
|
||
your machine and a blank password. For secure mode, please follow the instructions given in the
|
||
[beeline documentation](https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients).
|
||
|
||
Configuration of Hive is done by placing your `hive-site.xml` file in `conf/`.
|
||
|
||
You may also use the beeline script that comes with Hive.
|
||
|
||
Thrift JDBC server also supports sending thrift RPC messages over HTTP transport.
|
||
Use the following setting to enable HTTP mode as system property or in `hive-site.xml` file in `conf/`:
|
||
|
||
hive.server2.transport.mode - Set this to value: http
|
||
hive.server2.thrift.http.port - HTTP port number fo listen on; default is 10001
|
||
hive.server2.http.endpoint - HTTP endpoint; default is cliservice
|
||
|
||
To test, use beeline to connect to the JDBC/ODBC server in http mode with:
|
||
|
||
beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>
|
||
|
||
|
||
## Running the Spark SQL CLI
|
||
|
||
The Spark SQL CLI is a convenient tool to run the Hive metastore service in local mode and execute
|
||
queries input from the command line. Note that the Spark SQL CLI cannot talk to the Thrift JDBC server.
|
||
|
||
To start the Spark SQL CLI, run the following in the Spark directory:
|
||
|
||
./bin/spark-sql
|
||
|
||
Configuration of Hive is done by placing your `hive-site.xml` file in `conf/`.
|
||
You may run `./bin/spark-sql --help` for a complete list of all available
|
||
options.
|
||
|
||
# Compatibility with Other Systems
|
||
|
||
## Migration Guide for Shark User
|
||
|
||
### Scheduling
|
||
To set a [Fair Scheduler](job-scheduling.html#fair-scheduler-pools) pool for a JDBC client session,
|
||
users can set the `spark.sql.thriftserver.scheduler.pool` variable:
|
||
|
||
SET spark.sql.thriftserver.scheduler.pool=accounting;
|
||
|
||
### Reducer number
|
||
|
||
In Shark, default reducer number is 1 and is controlled by the property `mapred.reduce.tasks`. Spark
|
||
SQL deprecates this property in favor of `spark.sql.shuffle.partitions`, whose default value
|
||
is 200. Users may customize this property via `SET`:
|
||
|
||
SET spark.sql.shuffle.partitions=10;
|
||
SELECT page, count(*) c
|
||
FROM logs_last_month_cached
|
||
GROUP BY page ORDER BY c DESC LIMIT 10;
|
||
|
||
You may also put this property in `hive-site.xml` to override the default value.
|
||
|
||
For now, the `mapred.reduce.tasks` property is still recognized, and is converted to
|
||
`spark.sql.shuffle.partitions` automatically.
|
||
|
||
### Caching
|
||
|
||
The `shark.cache` table property no longer exists, and tables whose name end with `_cached` are no
|
||
longer automatically cached. Instead, we provide `CACHE TABLE` and `UNCACHE TABLE` statements to
|
||
let user control table caching explicitly:
|
||
|
||
CACHE TABLE logs_last_month;
|
||
UNCACHE TABLE logs_last_month;
|
||
|
||
**NOTE:** `CACHE TABLE tbl` is now __eager__ by default not __lazy__. Don’t need to trigger cache materialization manually anymore.
|
||
|
||
Spark SQL newly introduced a statement to let user control table caching whether or not lazy since Spark 1.2.0:
|
||
|
||
CACHE [LAZY] TABLE [AS SELECT] ...
|
||
|
||
Several caching related features are not supported yet:
|
||
|
||
* User defined partition level cache eviction policy
|
||
* RDD reloading
|
||
* In-memory cache write through policy
|
||
|
||
## Compatibility with Apache Hive
|
||
|
||
Spark SQL is designed to be compatible with the Hive Metastore, SerDes and UDFs. Currently Spark
|
||
SQL is based on Hive 0.12.0 and 0.13.1.
|
||
|
||
#### Deploying in Existing Hive Warehouses
|
||
|
||
The Spark SQL Thrift JDBC server is designed to be "out of the box" compatible with existing Hive
|
||
installations. You do not need to modify your existing Hive Metastore or change the data placement
|
||
or partitioning of your tables.
|
||
|
||
### Supported Hive Features
|
||
|
||
Spark SQL supports the vast majority of Hive features, such as:
|
||
|
||
* Hive query statements, including:
|
||
* `SELECT`
|
||
* `GROUP BY`
|
||
* `ORDER BY`
|
||
* `CLUSTER BY`
|
||
* `SORT BY`
|
||
* All Hive operators, including:
|
||
* Relational operators (`=`, `⇔`, `==`, `<>`, `<`, `>`, `>=`, `<=`, etc)
|
||
* Arithmetic operators (`+`, `-`, `*`, `/`, `%`, etc)
|
||
* Logical operators (`AND`, `&&`, `OR`, `||`, etc)
|
||
* Complex type constructors
|
||
* Mathematical functions (`sign`, `ln`, `cos`, etc)
|
||
* String functions (`instr`, `length`, `printf`, etc)
|
||
* User defined functions (UDF)
|
||
* User defined aggregation functions (UDAF)
|
||
* User defined serialization formats (SerDes)
|
||
* Joins
|
||
* `JOIN`
|
||
* `{LEFT|RIGHT|FULL} OUTER JOIN`
|
||
* `LEFT SEMI JOIN`
|
||
* `CROSS JOIN`
|
||
* Unions
|
||
* Sub-queries
|
||
* `SELECT col FROM ( SELECT a + b AS col from t1) t2`
|
||
* Sampling
|
||
* Explain
|
||
* Partitioned tables
|
||
* View
|
||
* All Hive DDL Functions, including:
|
||
* `CREATE TABLE`
|
||
* `CREATE TABLE AS SELECT`
|
||
* `ALTER TABLE`
|
||
* Most Hive Data types, including:
|
||
* `TINYINT`
|
||
* `SMALLINT`
|
||
* `INT`
|
||
* `BIGINT`
|
||
* `BOOLEAN`
|
||
* `FLOAT`
|
||
* `DOUBLE`
|
||
* `STRING`
|
||
* `BINARY`
|
||
* `TIMESTAMP`
|
||
* `DATE`
|
||
* `ARRAY<>`
|
||
* `MAP<>`
|
||
* `STRUCT<>`
|
||
|
||
### Unsupported Hive Functionality
|
||
|
||
Below is a list of Hive features that we don't support yet. Most of these features are rarely used
|
||
in Hive deployments.
|
||
|
||
**Major Hive Features**
|
||
|
||
* Tables with buckets: bucket is the hash partitioning within a Hive table partition. Spark SQL
|
||
doesn't support buckets yet.
|
||
|
||
**Esoteric Hive Features**
|
||
|
||
* Tables with partitions using different input formats: In Spark SQL, all table partitions need to
|
||
have the same input format.
|
||
* Non-equi outer join: For the uncommon use case of using outer joins with non-equi join conditions
|
||
(e.g. condition "`key < 10`"), Spark SQL will output wrong result for the `NULL` tuple.
|
||
* `UNION` type
|
||
* Unique join
|
||
* Single query multi insert
|
||
* Column statistics collecting: Spark SQL does not piggyback scans to collect column statistics at
|
||
the moment and only supports populating the sizeInBytes field of the hive metastore.
|
||
|
||
**Hive Input/Output Formats**
|
||
|
||
* File format for CLI: For results showing back to the CLI, Spark SQL only supports TextOutputFormat.
|
||
* Hadoop archive
|
||
|
||
**Hive Optimizations**
|
||
|
||
A handful of Hive optimizations are not yet included in Spark. Some of these (such as indexes) are
|
||
less important due to Spark SQL's in-memory computational model. Others are slotted for future
|
||
releases of Spark SQL.
|
||
|
||
* Block level bitmap indexes and virtual columns (used to build indexes)
|
||
* Automatically convert a join to map join: For joining a large table with multiple small tables,
|
||
Hive automatically converts the join into a map join. We are adding this auto conversion in the
|
||
next release.
|
||
* Automatically determine the number of reducers for joins and groupbys: Currently in Spark SQL, you
|
||
need to control the degree of parallelism post-shuffle using "`SET spark.sql.shuffle.partitions=[num_tasks];`".
|
||
* Meta-data only query: For queries that can be answered by using only meta data, Spark SQL still
|
||
launches tasks to compute the result.
|
||
* Skew data flag: Spark SQL does not follow the skew data flags in Hive.
|
||
* `STREAMTABLE` hint in join: Spark SQL does not follow the `STREAMTABLE` hint.
|
||
* Merge multiple small files for query results: if the result output contains multiple small files,
|
||
Hive can optionally merge the small files into fewer large files to avoid overflowing the HDFS
|
||
metadata. Spark SQL does not support that.
|
||
|
||
# Writing Language-Integrated Relational Queries
|
||
|
||
**Language-Integrated queries are experimental and currently only supported in Scala.**
|
||
|
||
Spark SQL also supports a domain specific language for writing queries. Once again,
|
||
using the data from the above examples:
|
||
|
||
{% highlight scala %}
|
||
// sc is an existing SparkContext.
|
||
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
|
||
// Importing the SQL context gives access to all the public SQL functions and implicit conversions.
|
||
import sqlContext._
|
||
val people: RDD[Person] = ... // An RDD of case class objects, from the first example.
|
||
|
||
// The following is the same as 'SELECT name FROM people WHERE age >= 10 AND age <= 19'
|
||
val teenagers = people.where('age >= 10).where('age <= 19).select('name)
|
||
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
|
||
{% endhighlight %}
|
||
|
||
The DSL uses Scala symbols to represent columns in the underlying table, which are identifiers
|
||
prefixed with a tick (`'`). Implicit conversions turn these symbols into expressions that are
|
||
evaluated by the SQL execution engine. A full list of the functions supported can be found in the
|
||
[ScalaDoc](api/scala/index.html#org.apache.spark.sql.SchemaRDD).
|
||
|
||
<!-- TODO: Include the table of operations here. -->
|
||
|
||
# Spark SQL DataType Reference
|
||
|
||
* Numeric types
|
||
- `ByteType`: Represents 1-byte signed integer numbers.
|
||
The range of numbers is from `-128` to `127`.
|
||
- `ShortType`: Represents 2-byte signed integer numbers.
|
||
The range of numbers is from `-32768` to `32767`.
|
||
- `IntegerType`: Represents 4-byte signed integer numbers.
|
||
The range of numbers is from `-2147483648` to `2147483647`.
|
||
- `LongType`: Represents 8-byte signed integer numbers.
|
||
The range of numbers is from `-9223372036854775808` to `9223372036854775807`.
|
||
- `FloatType`: Represents 4-byte single-precision floating point numbers.
|
||
- `DoubleType`: Represents 8-byte double-precision floating point numbers.
|
||
- `DecimalType`: Represents arbitrary-precision signed decimal numbers. Backed internally by `java.math.BigDecimal`. A `BigDecimal` consists of an arbitrary precision integer unscaled value and a 32-bit integer scale.
|
||
* String type
|
||
- `StringType`: Represents character string values.
|
||
* Binary type
|
||
- `BinaryType`: Represents byte sequence values.
|
||
* Boolean type
|
||
- `BooleanType`: Represents boolean values.
|
||
* Datetime type
|
||
- `TimestampType`: Represents values comprising values of fields year, month, day,
|
||
hour, minute, and second.
|
||
- `DateType`: Represents values comprising values of fields year, month, day.
|
||
* Complex types
|
||
- `ArrayType(elementType, containsNull)`: Represents values comprising a sequence of
|
||
elements with the type of `elementType`. `containsNull` is used to indicate if
|
||
elements in a `ArrayType` value can have `null` values.
|
||
- `MapType(keyType, valueType, valueContainsNull)`:
|
||
Represents values comprising a set of key-value pairs. The data type of keys are
|
||
described by `keyType` and the data type of values are described by `valueType`.
|
||
For a `MapType` value, keys are not allowed to have `null` values. `valueContainsNull`
|
||
is used to indicate if values of a `MapType` value can have `null` values.
|
||
- `StructType(fields)`: Represents values with the structure described by
|
||
a sequence of `StructField`s (`fields`).
|
||
* `StructField(name, dataType, nullable)`: Represents a field in a `StructType`.
|
||
The name of a field is indicated by `name`. The data type of a field is indicated
|
||
by `dataType`. `nullable` is used to indicate if values of this fields can have
|
||
`null` values.
|
||
|
||
<div class="codetabs">
|
||
<div data-lang="scala" markdown="1">
|
||
|
||
All data types of Spark SQL are located in the package `org.apache.spark.sql`.
|
||
You can access them by doing
|
||
{% highlight scala %}
|
||
import org.apache.spark.sql._
|
||
{% endhighlight %}
|
||
|
||
<table class="table">
|
||
<tr>
|
||
<th style="width:20%">Data type</th>
|
||
<th style="width:40%">Value type in Scala</th>
|
||
<th>API to access or create a data type</th></tr>
|
||
<tr>
|
||
<td> <b>ByteType</b> </td>
|
||
<td> Byte </td>
|
||
<td>
|
||
ByteType
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td> <b>ShortType</b> </td>
|
||
<td> Short </td>
|
||
<td>
|
||
ShortType
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td> <b>IntegerType</b> </td>
|
||
<td> Int </td>
|
||
<td>
|
||
IntegerType
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td> <b>LongType</b> </td>
|
||
<td> Long </td>
|
||
<td>
|
||
LongType
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td> <b>FloatType</b> </td>
|
||
<td> Float </td>
|
||
<td>
|
||
FloatType
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td> <b>DoubleType</b> </td>
|
||
<td> Double </td>
|
||
<td>
|
||
DoubleType
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td> <b>DecimalType</b> </td>
|
||
<td> scala.math.BigDecimal </td>
|
||
<td>
|
||
DecimalType
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td> <b>StringType</b> </td>
|
||
<td> String </td>
|
||
<td>
|
||
StringType
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td> <b>BinaryType</b> </td>
|
||
<td> Array[Byte] </td>
|
||
<td>
|
||
BinaryType
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td> <b>BooleanType</b> </td>
|
||
<td> Boolean </td>
|
||
<td>
|
||
BooleanType
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td> <b>TimestampType</b> </td>
|
||
<td> java.sql.Timestamp </td>
|
||
<td>
|
||
TimestampType
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td> <b>DateType</b> </td>
|
||
<td> java.sql.Date </td>
|
||
<td>
|
||
DateType
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td> <b>ArrayType</b> </td>
|
||
<td> scala.collection.Seq </td>
|
||
<td>
|
||
ArrayType(<i>elementType</i>, [<i>containsNull</i>])<br />
|
||
<b>Note:</b> The default value of <i>containsNull</i> is <i>true</i>.
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td> <b>MapType</b> </td>
|
||
<td> scala.collection.Map </td>
|
||
<td>
|
||
MapType(<i>keyType</i>, <i>valueType</i>, [<i>valueContainsNull</i>])<br />
|
||
<b>Note:</b> The default value of <i>valueContainsNull</i> is <i>true</i>.
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td> <b>StructType</b> </td>
|
||
<td> org.apache.spark.sql.Row </td>
|
||
<td>
|
||
StructType(<i>fields</i>)<br />
|
||
<b>Note:</b> <i>fields</i> is a Seq of StructFields. Also, two fields with the same
|
||
name are not allowed.
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td> <b>StructField</b> </td>
|
||
<td> The value type in Scala of the data type of this field
|
||
(For example, Int for a StructField with the data type IntegerType) </td>
|
||
<td>
|
||
StructField(<i>name</i>, <i>dataType</i>, <i>nullable</i>)
|
||
</td>
|
||
</tr>
|
||
</table>
|
||
|
||
</div>
|
||
|
||
<div data-lang="java" markdown="1">
|
||
|
||
All data types of Spark SQL are located in the package of
|
||
`org.apache.spark.sql.types`. To access or create a data type,
|
||
please use factory methods provided in
|
||
`org.apache.spark.sql.types.DataTypes`.
|
||
|
||
<table class="table">
|
||
<tr>
|
||
<th style="width:20%">Data type</th>
|
||
<th style="width:40%">Value type in Java</th>
|
||
<th>API to access or create a data type</th></tr>
|
||
<tr>
|
||
<td> <b>ByteType</b> </td>
|
||
<td> byte or Byte </td>
|
||
<td>
|
||
DataTypes.ByteType
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td> <b>ShortType</b> </td>
|
||
<td> short or Short </td>
|
||
<td>
|
||
DataTypes.ShortType
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td> <b>IntegerType</b> </td>
|
||
<td> int or Integer </td>
|
||
<td>
|
||
DataTypes.IntegerType
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td> <b>LongType</b> </td>
|
||
<td> long or Long </td>
|
||
<td>
|
||
DataTypes.LongType
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td> <b>FloatType</b> </td>
|
||
<td> float or Float </td>
|
||
<td>
|
||
DataTypes.FloatType
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td> <b>DoubleType</b> </td>
|
||
<td> double or Double </td>
|
||
<td>
|
||
DataTypes.DoubleType
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td> <b>DecimalType</b> </td>
|
||
<td> java.math.BigDecimal </td>
|
||
<td>
|
||
DataTypes.createDecimalType()<br />
|
||
DataTypes.createDecimalType(<i>precision</i>, <i>scale</i>).
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td> <b>StringType</b> </td>
|
||
<td> String </td>
|
||
<td>
|
||
DataTypes.StringType
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td> <b>BinaryType</b> </td>
|
||
<td> byte[] </td>
|
||
<td>
|
||
DataTypes.BinaryType
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td> <b>BooleanType</b> </td>
|
||
<td> boolean or Boolean </td>
|
||
<td>
|
||
DataTypes.BooleanType
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td> <b>TimestampType</b> </td>
|
||
<td> java.sql.Timestamp </td>
|
||
<td>
|
||
DataTypes.TimestampType
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td> <b>DateType</b> </td>
|
||
<td> java.sql.Date </td>
|
||
<td>
|
||
DataTypes.DateType
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td> <b>ArrayType</b> </td>
|
||
<td> java.util.List </td>
|
||
<td>
|
||
DataTypes.createArrayType(<i>elementType</i>)<br />
|
||
<b>Note:</b> The value of <i>containsNull</i> will be <i>true</i><br />
|
||
DataTypes.createArrayType(<i>elementType</i>, <i>containsNull</i>).
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td> <b>MapType</b> </td>
|
||
<td> java.util.Map </td>
|
||
<td>
|
||
DataTypes.createMapType(<i>keyType</i>, <i>valueType</i>)<br />
|
||
<b>Note:</b> The value of <i>valueContainsNull</i> will be <i>true</i>.<br />
|
||
DataTypes.createMapType(<i>keyType</i>, <i>valueType</i>, <i>valueContainsNull</i>)<br />
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td> <b>StructType</b> </td>
|
||
<td> org.apache.spark.sql.api.java.Row </td>
|
||
<td>
|
||
DataTypes.createStructType(<i>fields</i>)<br />
|
||
<b>Note:</b> <i>fields</i> is a List or an array of StructFields.
|
||
Also, two fields with the same name are not allowed.
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td> <b>StructField</b> </td>
|
||
<td> The value type in Java of the data type of this field
|
||
(For example, int for a StructField with the data type IntegerType) </td>
|
||
<td>
|
||
DataTypes.createStructField(<i>name</i>, <i>dataType</i>, <i>nullable</i>)
|
||
</td>
|
||
</tr>
|
||
</table>
|
||
|
||
</div>
|
||
|
||
<div data-lang="python" markdown="1">
|
||
|
||
All data types of Spark SQL are located in the package of `pyspark.sql`.
|
||
You can access them by doing
|
||
{% highlight python %}
|
||
from pyspark.sql import *
|
||
{% endhighlight %}
|
||
|
||
<table class="table">
|
||
<tr>
|
||
<th style="width:20%">Data type</th>
|
||
<th style="width:40%">Value type in Python</th>
|
||
<th>API to access or create a data type</th></tr>
|
||
<tr>
|
||
<td> <b>ByteType</b> </td>
|
||
<td>
|
||
int or long <br />
|
||
<b>Note:</b> Numbers will be converted to 1-byte signed integer numbers at runtime.
|
||
Please make sure that numbers are within the range of -128 to 127.
|
||
</td>
|
||
<td>
|
||
ByteType()
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td> <b>ShortType</b> </td>
|
||
<td>
|
||
int or long <br />
|
||
<b>Note:</b> Numbers will be converted to 2-byte signed integer numbers at runtime.
|
||
Please make sure that numbers are within the range of -32768 to 32767.
|
||
</td>
|
||
<td>
|
||
ShortType()
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td> <b>IntegerType</b> </td>
|
||
<td> int or long </td>
|
||
<td>
|
||
IntegerType()
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td> <b>LongType</b> </td>
|
||
<td>
|
||
long <br />
|
||
<b>Note:</b> Numbers will be converted to 8-byte signed integer numbers at runtime.
|
||
Please make sure that numbers are within the range of
|
||
-9223372036854775808 to 9223372036854775807.
|
||
Otherwise, please convert data to decimal.Decimal and use DecimalType.
|
||
</td>
|
||
<td>
|
||
LongType()
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td> <b>FloatType</b> </td>
|
||
<td>
|
||
float <br />
|
||
<b>Note:</b> Numbers will be converted to 4-byte single-precision floating
|
||
point numbers at runtime.
|
||
</td>
|
||
<td>
|
||
FloatType()
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td> <b>DoubleType</b> </td>
|
||
<td> float </td>
|
||
<td>
|
||
DoubleType()
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td> <b>DecimalType</b> </td>
|
||
<td> decimal.Decimal </td>
|
||
<td>
|
||
DecimalType()
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td> <b>StringType</b> </td>
|
||
<td> string </td>
|
||
<td>
|
||
StringType()
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td> <b>BinaryType</b> </td>
|
||
<td> bytearray </td>
|
||
<td>
|
||
BinaryType()
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td> <b>BooleanType</b> </td>
|
||
<td> bool </td>
|
||
<td>
|
||
BooleanType()
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td> <b>TimestampType</b> </td>
|
||
<td> datetime.datetime </td>
|
||
<td>
|
||
TimestampType()
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td> <b>DateType</b> </td>
|
||
<td> datetime.date </td>
|
||
<td>
|
||
DateType()
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td> <b>ArrayType</b> </td>
|
||
<td> list, tuple, or array </td>
|
||
<td>
|
||
ArrayType(<i>elementType</i>, [<i>containsNull</i>])<br />
|
||
<b>Note:</b> The default value of <i>containsNull</i> is <i>True</i>.
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td> <b>MapType</b> </td>
|
||
<td> dict </td>
|
||
<td>
|
||
MapType(<i>keyType</i>, <i>valueType</i>, [<i>valueContainsNull</i>])<br />
|
||
<b>Note:</b> The default value of <i>valueContainsNull</i> is <i>True</i>.
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td> <b>StructType</b> </td>
|
||
<td> list or tuple </td>
|
||
<td>
|
||
StructType(<i>fields</i>)<br />
|
||
<b>Note:</b> <i>fields</i> is a Seq of StructFields. Also, two fields with the same
|
||
name are not allowed.
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td> <b>StructField</b> </td>
|
||
<td> The value type in Python of the data type of this field
|
||
(For example, Int for a StructField with the data type IntegerType) </td>
|
||
<td>
|
||
StructField(<i>name</i>, <i>dataType</i>, <i>nullable</i>)
|
||
</td>
|
||
</tr>
|
||
</table>
|
||
|
||
</div>
|
||
|
||
</div>
|
||
|