[SPARK-5310] [SQL] [DOC] Parquet section for the SQL programming guide

Also fixed a bunch of minor styling issues.

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5001)
<!-- Reviewable:end -->

Author: Cheng Lian <lian@databricks.com>

Closes #5001 from liancheng/parquet-doc and squashes the following commits:

89ad3db [Cheng Lian] Addresses @rxin's comments
7eb6955 [Cheng Lian] Docs for the new Parquet data source
415eefb [Cheng Lian] Some minor formatting improvements
This commit is contained in:
Cheng Lian 2015-03-13 21:34:50 +08:00
parent 0af9ea74a0
commit 69ff8e8cfb

View file

@ -21,14 +21,14 @@ The DataFrame API is available in [Scala](api/scala/index.html#org.apache.spark.
All of the examples on this page use sample data included in the Spark distribution and can be run in the `spark-shell` or the `pyspark` shell.
## Starting Point: SQLContext
## Starting Point: `SQLContext`
<div class="codetabs">
<div data-lang="scala" markdown="1">
The entry point into all functionality in Spark SQL is the
[SQLContext](api/scala/index.html#org.apache.spark.sql.SQLContext) class, or one of its
descendants. To create a basic SQLContext, all you need is a SparkContext.
[`SQLContext`](api/scala/index.html#org.apache.spark.sql.`SQLContext`) class, or one of its
descendants. To create a basic `SQLContext`, all you need is a SparkContext.
{% highlight scala %}
val sc: SparkContext // An existing SparkContext.
@ -43,8 +43,8 @@ import sqlContext.implicits._
<div data-lang="java" markdown="1">
The entry point into all functionality in Spark SQL is the
[SQLContext](api/java/index.html#org.apache.spark.sql.SQLContext) class, or one of its
descendants. To create a basic SQLContext, all you need is a SparkContext.
[`SQLContext`](api/java/index.html#org.apache.spark.sql.SQLContext) class, or one of its
descendants. To create a basic `SQLContext`, all you need is a SparkContext.
{% highlight java %}
JavaSparkContext sc = ...; // An existing JavaSparkContext.
@ -56,8 +56,8 @@ SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
<div data-lang="python" markdown="1">
The entry point into all relational functionality in Spark is the
[SQLContext](api/python/pyspark.sql.SQLContext-class.html) class, or one
of its decedents. To create a basic SQLContext, all you need is a SparkContext.
[`SQLContext`](api/python/pyspark.sql.SQLContext-class.html) class, or one
of its decedents. To create a basic `SQLContext`, all you need is a SparkContext.
{% highlight python %}
from pyspark.sql import SQLContext
@ -67,20 +67,20 @@ sqlContext = SQLContext(sc)
</div>
</div>
In addition to the basic SQLContext, you can also create a HiveContext, which provides a
superset of the functionality provided by the basic SQLContext. Additional features include
In addition to the basic `SQLContext`, you can also create a `HiveContext`, which provides a
superset of the functionality provided by the basic `SQLContext`. Additional features include
the ability to write queries using the more complete HiveQL parser, access to Hive UDFs, and the
ability to read data from Hive tables. To use a HiveContext, you do not need to have an
existing Hive setup, and all of the data sources available to a SQLContext are still available.
HiveContext is only packaged separately to avoid including all of Hive's dependencies in the default
Spark build. If these dependencies are not a problem for your application then using HiveContext
is recommended for the 1.3 release of Spark. Future releases will focus on bringing SQLContext up
to feature parity with a HiveContext.
ability to read data from Hive tables. To use a `HiveContext`, you do not need to have an
existing Hive setup, and all of the data sources available to a `SQLContext` are still available.
`HiveContext` is only packaged separately to avoid including all of Hive's dependencies in the default
Spark build. If these dependencies are not a problem for your application then using `HiveContext`
is recommended for the 1.3 release of Spark. Future releases will focus on bringing `SQLContext` up
to feature parity with a `HiveContext`.
The specific variant of SQL that is used to parse queries can also be selected using the
`spark.sql.dialect` option. This parameter can be changed using either the `setConf` method on
a SQLContext or by using a `SET key=value` command in SQL. For a SQLContext, the only dialect
available is "sql" which uses a simple SQL parser provided by Spark SQL. In a HiveContext, the
a `SQLContext` or by using a `SET key=value` command in SQL. For a `SQLContext`, the only dialect
available is "sql" which uses a simple SQL parser provided by Spark SQL. In a `HiveContext`, the
default is "hiveql", though "sql" is also available. Since the HiveQL parser is much more complete,
this is recommended for most use cases.
@ -100,7 +100,7 @@ val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.jsonFile("examples/src/main/resources/people.json")
// Displays the content of the DataFrame to stdout
df.show()
df.show()
{% endhighlight %}
</div>
@ -151,10 +151,10 @@ val df = sqlContext.jsonFile("examples/src/main/resources/people.json")
// Show the content of the DataFrame
df.show()
// age name
// age name
// null Michael
// 30 Andy
// 19 Justin
// 30 Andy
// 19 Justin
// Print the schema in a tree format
df.printSchema()
@ -164,17 +164,17 @@ df.printSchema()
// Select only the "name" column
df.select("name").show()
// name
// name
// Michael
// Andy
// Justin
// Andy
// Justin
// Select everybody, but increment the age by 1
df.select("name", df("age") + 1).show()
// name (age + 1)
// Michael null
// Andy 31
// Justin 20
// Michael null
// Andy 31
// Justin 20
// Select people older than 21
df.filter(df("name") > 21).show()
@ -201,10 +201,10 @@ DataFrame df = sqlContext.jsonFile("examples/src/main/resources/people.json");
// Show the content of the DataFrame
df.show();
// age name
// age name
// null Michael
// 30 Andy
// 19 Justin
// 30 Andy
// 19 Justin
// Print the schema in a tree format
df.printSchema();
@ -214,17 +214,17 @@ df.printSchema();
// Select only the "name" column
df.select("name").show();
// name
// name
// Michael
// Andy
// Justin
// Andy
// Justin
// Select everybody, but increment the age by 1
df.select("name", df.col("age").plus(1)).show();
// name (age + 1)
// Michael null
// Andy 31
// Justin 20
// Michael null
// Andy 31
// Justin 20
// Select people older than 21
df.filter(df("name") > 21).show();
@ -251,10 +251,10 @@ df = sqlContext.jsonFile("examples/src/main/resources/people.json")
# Show the content of the DataFrame
df.show()
## age name
## age name
## null Michael
## 30 Andy
## 19 Justin
## 30 Andy
## 19 Justin
# Print the schema in a tree format
df.printSchema()
@ -264,17 +264,17 @@ df.printSchema()
# Select only the "name" column
df.select("name").show()
## name
## name
## Michael
## Andy
## Justin
## Andy
## Justin
# Select everybody, but increment the age by 1
df.select("name", df.age + 1).show()
## name (age + 1)
## Michael null
## Andy 31
## Justin 20
## Michael null
## Andy 31
## Justin 20
# Select people older than 21
df.filter(df.name > 21).show()
@ -797,7 +797,7 @@ When working with a `HiveContext`, `DataFrames` can also be saved as persistent
contents of the dataframe and create a pointer to the data in the HiveMetastore. Persistent tables
will still exist even after your Spark program has restarted, as long as you maintain your connection
to the same metastore. A DataFrame for a persistent table can be created by calling the `table`
method on a SQLContext with the name of the table.
method on a `SQLContext` with the name of the table.
By default `saveAsTable` will create a "managed table", meaning that the location of the data will
be controlled by the metastore. Managed tables will also have their data deleted automatically
@ -907,9 +907,132 @@ SELECT * FROM parquetTable
</div>
### Partition discovery
Table partitioning is a common optimization approach used in systems like Hive. In a partitioned
table, data are usually stored in different directories, with partitioning column values encoded in
the path of each partition directory. The Parquet data source is now able to discover and infer
partitioning information automatically. For exmaple, we can store all our previously used
population data into a partitioned table using the following directory structure, with two extra
columns, `gender` and `country` as partitioning columns:
{% highlight text %}
path
└── to
└── table
├── gender=male
│   ├── ...
│   │
│   ├── country=US
│   │   └── data.parquet
│   ├── country=CN
│   │   └── data.parquet
│   └── ...
└── gender=female
   ├── ...
  
   ├── country=US
   │   └── data.parquet
   ├── country=CN
   │   └── data.parquet
   └── ...
{% endhighlight %}
By passing `path/to/table` to either `SQLContext.parquetFile` or `SQLContext.load`, Spark SQL will
automatically extract the partitioning information from the paths. Now the schema of the returned
DataFrame becomes:
{% highlight text %}
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)
{% endhighlight %}
Notice that the data types of the partitioning columns are automatically inferred. Currently,
numeric data types and string type are supported.
### Schema merging
Like ProtocolBuffer, Avro, and Thrift, Parquet also supports schema evolution. Users can start with
a simple schema, and gradually add more columns to the schema as needed. In this way, users may end
up with multiple Parquet files with different but mutually compatible schemas. The Parquet data
source is now able to automatically detect this case and merge schemas of all these files.
<div class="codetabs">
<div data-lang="scala" markdown="1">
{% highlight scala %}
// sqlContext from the previous example is used in this example.
// This is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
// Create a simple DataFrame, stored into a partition directory
val df1 = sparkContext.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double")
df1.saveAsParquetFile("data/test_table/key=1")
// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val df2 = sparkContext.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple")
df2.saveAsParquetFile("data/test_table/key=2")
// Read the partitioned table
val df3 = sqlContext.parquetFile("data/test_table")
df3.printSchema()
// The final schema consists of all 3 columns in the Parquet files together
// with the partiioning column appeared in the partition directory paths.
// root
// |-- single: int (nullable = true)
// |-- double: int (nullable = true)
// |-- triple: int (nullable = true)
// |-- key : int (nullable = true)
{% endhighlight %}
</div>
<div data-lang="python" markdown="1">
{% highlight python %}
# sqlContext from the previous example is used in this example.
# Create a simple DataFrame, stored into a partition directory
df1 = sqlContext.createDataFrame(sc.parallelize(range(1, 6))\
.map(lambda i: Row(single=i, double=i * 2)))
df1.save("data/test_table/key=1", "parquet")
# Create another DataFrame in a new partition directory,
# adding a new column and dropping an existing column
df2 = sqlContext.createDataFrame(sc.parallelize(range(6, 11))
.map(lambda i: Row(single=i, triple=i * 3)))
df2.save("data/test_table/key=2", "parquet")
# Read the partitioned table
df3 = sqlContext.parquetFile("data/test_table")
df3.printSchema()
# The final schema consists of all 3 columns in the Parquet files together
# with the partiioning column appeared in the partition directory paths.
# root
# |-- single: int (nullable = true)
# |-- double: int (nullable = true)
# |-- triple: int (nullable = true)
# |-- key : int (nullable = true)
{% endhighlight %}
</div>
</div>
### Configuration
Configuration of Parquet can be done using the `setConf` method on SQLContext or by running
Configuration of Parquet can be done using the `setConf` method on `SQLContext` or by running
`SET key=value` commands using SQL.
<table class="table">
@ -972,7 +1095,7 @@ Configuration of Parquet can be done using the `setConf` method on SQLContext or
<div data-lang="scala" markdown="1">
Spark SQL can automatically infer the schema of a JSON dataset and load it as a DataFrame.
This conversion can be done using one of two methods in a SQLContext:
This conversion can be done using one of two methods in a `SQLContext`:
* `jsonFile` - loads data from a directory of JSON files where each line of the files is a JSON object.
* `jsonRDD` - loads data from an existing RDD where each element of the RDD is a string containing a JSON object.
@ -1014,7 +1137,7 @@ val anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD)
<div data-lang="java" markdown="1">
Spark SQL can automatically infer the schema of a JSON dataset and load it as a DataFrame.
This conversion can be done using one of two methods in a SQLContext :
This conversion can be done using one of two methods in a `SQLContext` :
* `jsonFile` - loads data from a directory of JSON files where each line of the files is a JSON object.
* `jsonRDD` - loads data from an existing RDD where each element of the RDD is a string containing a JSON object.
@ -1056,7 +1179,7 @@ DataFrame anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD);
<div data-lang="python" markdown="1">
Spark SQL can automatically infer the schema of a JSON dataset and load it as a DataFrame.
This conversion can be done using one of two methods in a SQLContext:
This conversion can be done using one of two methods in a `SQLContext`:
* `jsonFile` - loads data from a directory of JSON files where each line of the files is a JSON object.
* `jsonRDD` - loads data from an existing RDD where each element of the RDD is a string containing a JSON object.
@ -1085,7 +1208,7 @@ people.printSchema()
# Register this DataFrame as a table.
people.registerTempTable("people")
# SQL statements can be run by using the sql methods provided by sqlContext.
# SQL statements can be run by using the sql methods provided by `sqlContext`.
teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
# Alternatively, a DataFrame can be created for a JSON dataset represented by
@ -1131,7 +1254,7 @@ Configuration of Hive is done by placing your `hive-site.xml` file in `conf/`.
When working with Hive one must construct a `HiveContext`, which inherits from `SQLContext`, and
adds support for finding tables in the MetaStore and writing queries using HiveQL. Users who do
not have an existing Hive deployment can still create a HiveContext. When not configured by the
not have an existing Hive deployment can still create a `HiveContext`. When not configured by the
hive-site.xml, the context automatically creates `metastore_db` and `warehouse` in the current
directory.
@ -1318,7 +1441,7 @@ Spark SQL can cache tables using an in-memory columnar format by calling `sqlCon
Then Spark SQL will scan only required columns and will automatically tune compression to minimize
memory usage and GC pressure. You can call `sqlContext.uncacheTable("tableName")` to remove the table from memory.
Configuration of in-memory caching can be done using the `setConf` method on SQLContext or by running
Configuration of in-memory caching can be done using the `setConf` method on `SQLContext` or by running
`SET key=value` commands using SQL.
<table class="table">
@ -1429,10 +1552,10 @@ Configuration of Hive is done by placing your `hive-site.xml` file in `conf/`.
You may also use the beeline script that comes with Hive.
Thrift JDBC server also supports sending thrift RPC messages over HTTP transport.
Use the following setting to enable HTTP mode as system property or in `hive-site.xml` file in `conf/`:
Thrift JDBC server also supports sending thrift RPC messages over HTTP transport.
Use the following setting to enable HTTP mode as system property or in `hive-site.xml` file in `conf/`:
hive.server2.transport.mode - Set this to value: http
hive.server2.transport.mode - Set this to value: http
hive.server2.thrift.http.port - HTTP port number fo listen on; default is 10001
hive.server2.http.endpoint - HTTP endpoint; default is cliservice
@ -1506,7 +1629,7 @@ When using function inside of the DSL (now replaced with the `DataFrame` API) us
Spark 1.3 removes the type aliases that were present in the base sql package for `DataType`. Users
should instead import the classes in `org.apache.spark.sql.types`
#### UDF Registration Moved to sqlContext.udf (Java & Scala)
#### UDF Registration Moved to `sqlContext.udf` (Java & Scala)
Functions that are used to register UDFs, either for use in the DataFrame DSL or SQL, have been
moved into the udf object in `SQLContext`.