[SPARK-5183][SQL] Update SQL Docs with JDBC and Migration Guide

Author: Michael Armbrust <michael@databricks.com>

Closes #4958 from marmbrus/sqlDocs and squashes the following commits:

9351dbc [Michael Armbrust] fix parquet example
6877e13 [Michael Armbrust] add sql examples
d81b7e7 [Michael Armbrust] rxins comments
e393528 [Michael Armbrust] fix order
19c2735 [Michael Armbrust] more on data source load/store
00d5914 [Michael Armbrust] Update SQL Docs with JDBC and Migration Guide
This commit is contained in:
Michael Armbrust 2015-03-10 18:13:09 -07:00
parent 74fb433702
commit 2672374110

View file

@ -9,7 +9,7 @@ title: Spark SQL and DataFrames
# Overview
Spark SQL is a Spark module for structured data processing. It provides a programming abstraction called DataFrames and can also act as a distributed query engine.
Spark SQL is a Spark module for structured data processing. It provides a programming abstraction called DataFrames and can also act as distributed SQL query engine.
# DataFrames
@ -662,8 +662,146 @@ for name in names.collect():
Spark SQL supports operating on a variety of data sources through the `DataFrame` interface.
A DataFrame can be operated on as normal RDDs and can also be registered as a temporary table.
Registering a DataFrame as a table allows you to run SQL queries over its data. This section
describes the various methods for loading data into a DataFrame.
describes the general methods for loading and saving data using the Spark Data Sources and then
goes into specific options that are available for the built-in data sources.
## Generic Load/Save Functions
In the simplest form, the default data source (`parquet` unless otherwise configured by
`spark.sql.sources.default`) will be used for all operations.
<div class="codetabs">
<div data-lang="scala" markdown="1">
{% highlight scala %}
val df = sqlContext.load("people.parquet")
df.select("name", "age").save("namesAndAges.parquet")
{% endhighlight %}
</div>
<div data-lang="java" markdown="1">
{% highlight java %}
DataFrame df = sqlContext.load("people.parquet");
df.select("name", "age").save("namesAndAges.parquet");
{% endhighlight %}
</div>
<div data-lang="python" markdown="1">
{% highlight python %}
df = sqlContext.load("people.parquet")
df.select("name", "age").save("namesAndAges.parquet")
{% endhighlight %}
</div>
</div>
### Manually Specifying Options
You can also manually specify the data source that will be used along with any extra options
that you would like to pass to the data source. Data sources are specified by their fully qualified
name (i.e., `org.apache.spark.sql.parquet`), but for built-in sources you can also use the shorted
name (`json`, `parquet`, `jdbc`). DataFrames of any type can be converted into other types
using this syntax.
<div class="codetabs">
<div data-lang="scala" markdown="1">
{% highlight scala %}
val df = sqlContext.load("people.json", "json")
df.select("name", "age").save("namesAndAges.parquet", "parquet")
{% endhighlight %}
</div>
<div data-lang="java" markdown="1">
{% highlight java %}
DataFrame df = sqlContext.load("people.json", "json");
df.select("name", "age").save("namesAndAges.parquet", "parquet");
{% endhighlight %}
</div>
<div data-lang="python" markdown="1">
{% highlight python %}
df = sqlContext.load("people.json", "json")
df.select("name", "age").save("namesAndAges.parquet", "parquet")
{% endhighlight %}
</div>
</div>
### Save Modes
Save operations can optionally take a `SaveMode`, that specifies how to handle existing data if
present. It is important to realize that these save modes do not utilize any locking and are not
atomic. Thus, it is not safe to have multiple writers attempting to write to the same location.
Additionally, when performing a `Overwrite`, the data will be deleted before writing out the
new data.
<table class="table">
<tr><th>Scala/Java</th><th>Python</th><th>Meaning</th></tr>
<tr>
<td><code>SaveMode.ErrorIfExists</code> (default)</td>
<td><code>"error"</code> (default)</td>
<td>
When saving a DataFrame to a data source, if data already exists,
an exception is expected to be thrown.
</td>
</tr>
<tr>
<td><code>SaveMode.Append</code></td>
<td><code>"append"</code></td>
<td>
When saving a DataFrame to a data source, if data/table already exists,
contents of the DataFrame are expected to be appended to existing data.
</td>
</tr>
<tr>
<td><code>SaveMode.Overwrite</code></td>
<td><code>"overwrite"</code></td>
<td>
Overwrite mode means that when saving a DataFrame to a data source,
if data/table already exists, existing data is expected to be overwritten by the contents of
the DataFrame.
</td>
</tr>
<tr>
<td><code>SaveMode.Ignore</code></td>
<td><code>"ignore"</code></td>
<td>
Ignore mode means that when saving a DataFrame to a data source, if data already exists,
the save operation is expected to not save the contents of the DataFrame and to not
change the existing data. This is similar to a `CREATE TABLE IF NOT EXISTS` in SQL.
</td>
</tr>
</table>
### Saving to Persistent Tables
When working with a `HiveContext`, `DataFrames` can also be saved as persistent tables using the
`saveAsTable` command. Unlike the `registerTempTable` command, `saveAsTable` will materialize the
contents of the dataframe and create a pointer to the data in the HiveMetastore. Persistent tables
will still exist even after your Spark program has restarted, as long as you maintain your connection
to the same metastore. A DataFrame for a persistent table can be created by calling the `table`
method on a SQLContext with the name of the table.
By default `saveAsTable` will create a "managed table", meaning that the location of the data will
be controlled by the metastore. Managed tables will also have their data deleted automatically
when a table is dropped.
## Parquet Files
@ -751,6 +889,22 @@ for teenName in teenNames.collect():
</div>
<div data-lang="sql" markdown="1">
{% highlight sql %}
CREATE TEMPORARY TABLE parquetTable
USING org.apache.spark.sql.parquet
OPTIONS (
path "examples/src/main/resources/people.parquet"
)
SELECT * FROM parquetTable
{% endhighlight %}
</div>
</div>
### Configuration
@ -942,6 +1096,22 @@ anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD)
{% endhighlight %}
</div>
<div data-lang="sql" markdown="1">
{% highlight sql %}
CREATE TEMPORARY TABLE jsonTable
USING org.apache.spark.sql.json
OPTIONS (
path "examples/src/main/resources/people.json"
)
SELECT * FROM jsonTable
{% endhighlight %}
</div>
</div>
## Hive Tables
@ -1022,6 +1192,121 @@ results = sqlContext.sql("FROM src SELECT key, value").collect()
</div>
</div>
## JDBC To Other Databases
Spark SQL also includes a data source that can read data from other databases using JDBC. This
functionality should be preferred over using [JdbcRDD](api/scala/index.html#org.apache.spark.rdd.JdbcRDD).
This is because the results are returned
as a DataFrame and they can easily be processed in Spark SQL or joined with other data sources.
The JDBC data source is also easier to use from Java or Python as it does not require the user to
provide a ClassTag.
(Note that this is different than the Spark SQL JDBC server, which allows other applications to
run queries using Spark SQL).
To get started you will need to include the JDBC driver for you particular database on the
spark classpath. For example, to connect to postgres from the Spark Shell you would run the
following command:
{% highlight bash %}
SPARK_CLASSPATH=postgresql-9.3-1102-jdbc41.jar bin/spark-shell
{% endhighlight %}
Tables from the remote database can be loaded as a DataFrame or Spark SQL Temporary table using
the Data Sources API. The following options are supported:
<table class="table">
<tr><th>Property Name</th><th>Meaning</th></tr>
<tr>
<td><code>url</code></td>
<td>
The JDBC URL to connect to.
</td>
</tr>
<tr>
<td><code>dbtable</code></td>
<td>
The JDBC table that should be read. Note that anything that is valid in a `FROM` clause of
a SQL query can be used. For example, instead of a full table you could also use a
subquery in parentheses.
</td>
</tr>
<tr>
<td><code>driver</code></td>
<td>
The class name of the JDBC driver needed to connect to this URL. This class with be loaded
on the master and workers before running an JDBC commands to allow the driver to
register itself with the JDBC subsystem.
</td>
</tr>
<tr>
<td><code>partitionColumn, lowerBound, upperBound, numPartitions</code></td>
<td>
These options must all be specified if any of them is specified. They describe how to
partition the table when reading in parallel from multiple workers.
<code>partitionColumn</code> must be a numeric column from the table in question.
</td>
</tr>
</table>
<div class="codetabs">
<div data-lang="scala" markdown="1">
{% highlight scala %}
val jdbcDF = sqlContext.load("jdbc", Map(
"url" -> "jdbc:postgresql:dbserver",
"dbtable" -> "schema.tablename"))
{% endhighlight %}
</div>
<div data-lang="java" markdown="1">
{% highlight java %}
Map<String, String> options = new HashMap<String, String>();
options.put("url", "jdbc:postgresql:dbserver");
options.put("dbtable", "schema.tablename");
DataFrame jdbcDF = sqlContext.load("jdbc", options)
{% endhighlight %}
</div>
<div data-lang="python" markdown="1">
{% highlight python %}
df = sqlContext.load("jdbc", url="jdbc:postgresql:dbserver", dbtable="schema.tablename")
{% endhighlight %}
</div>
<div data-lang="sql" markdown="1">
{% highlight sql %}
CREATE TEMPORARY TABLE jdbcTable
USING org.apache.spark.sql.jdbc
OPTIONS (
url "jdbc:postgresql:dbserver",
dbtable "schema.tablename"
)
{% endhighlight %}
</div>
</div>
## Troubleshooting
* The JDBC driver class must be visible to the primordial class loader on the client session and on all executors. This is because Java's DriverManager class does a security check that results in it ignoring all drivers not visible to the primordial class loader when one goes to open a connection. One convenient way to do this is to modify compute_classpath.sh on all worker nodes to include your driver JARs.
* Some databases, such as H2, convert all names to upper case. You'll need to use upper case to refer to those names in Spark SQL.
# Performance Tuning
For some workloads it is possible to improve performance by either caching data in memory, or by
@ -1092,7 +1377,7 @@ that these options will be deprecated in future release as more optimizations ar
</tr>
</table>
# Distributed Query Engine
# Distributed SQL Engine
Spark SQL can also act as a distributed query engine using its JDBC/ODBC or command-line interface. In this mode, end-users or applications can interact with Spark SQL directly to run SQL queries, without the need to write any code.
@ -1171,6 +1456,87 @@ options.
# Migration Guide
## Upgrading from Spark SQL 1.0-1.2 to 1.3
In Spark 1.3 we removed the "Alpha" label from Spark SQL and as part of this did a cleanup of the
available APIs. From Spark 1.3 onwards, Spark SQL will provide binary compatibility with other
releases in the 1.X series. This compatibility guarantee excludes APIs that are explicitly marked
as unstable (i.e., DeveloperAPI or Experimental).
#### Rename of SchemaRDD to DataFrame
The largest change that users will notice when upgrading to Spark SQL 1.3 is that `SchemaRDD` has
been renamed to `DataFrame`. This is primarily because DataFrames no longer inherit from RDD
directly, but instead provide most of the functionality that RDDs provide though their own
implementation. DataFrames can still be converted to RDDs by calling the `.rdd` method.
In Scala there is a type alias from `SchemaRDD` to `DataFrame` to provide source compatibility for
some use cases. It is still recommended that users update their code to use `DataFrame` instead.
Java and Python users will need to update their code.
#### Unification of the Java and Scala APIs
Prior to Spark 1.3 there were separate Java compatible classes (`JavaSQLContext` and `JavaSchemaRDD`)
that mirrored the Scala API. In Spark 1.3 the Java API and Scala API have been unified. Users
of either language should use `SQLContext` and `DataFrame`. In general theses classes try to
use types that are usable from both languages (i.e. `Array` instead of language specific collections).
In some cases where no common type exists (e.g., for passing in closures or Maps) function overloading
is used instead.
Additionally the Java specific types API has been removed. Users of both Scala and Java should
use the classes present in `org.apache.spark.sql.types` to describe schema programmatically.
#### Isolation of Implicit Conversions and Removal of dsl Package (Scala-only)
Many of the code examples prior to Spark 1.3 started with `import sqlContext._`, which brought
all of the functions from sqlContext into scope. In Spark 1.3 we have isolated the implicit
conversions for converting `RDD`s into `DataFrame`s into an object inside of the `SQLContext`.
Users should now write `import sqlContext.implicits._`.
Additionally, the implicit conversions now only augment RDDs that are composed of `Product`s (i.e.,
case classes or tuples) with a method `toDF`, instead of applying automatically.
When using function inside of the DSL (now replaced with the `DataFrame` API) users used to import
`org.apache.spark.sql.catalyst.dsl`. Instead the public dataframe functions API should be used:
`import org.apache.spark.sql.functions._`.
#### Removal of the type aliases in org.apache.spark.sql for DataType (Scala-only)
Spark 1.3 removes the type aliases that were present in the base sql package for `DataType`. Users
should instead import the classes in `org.apache.spark.sql.types`
#### UDF Registration Moved to sqlContext.udf (Java & Scala)
Functions that are used to register UDFs, either for use in the DataFrame DSL or SQL, have been
moved into the udf object in `SQLContext`.
<div class="codetabs">
<div data-lang="scala" markdown="1">
{% highlight java %}
sqlCtx.udf.register("strLen", (s: String) => s.length())
{% endhighlight %}
</div>
<div data-lang="java" markdown="1">
{% highlight java %}
sqlCtx.udf().register("strLen", (String s) -> { s.length(); });
{% endhighlight %}
</div>
</div>
Python UDF registration is unchanged.
#### Python DataTypes No Longer Singletons
When using DataTypes in Python you will need to construct them (i.e. `StringType()`) instead of
referencing a singleton.
## Migration Guide for Shark User
### Scheduling
@ -1289,15 +1655,10 @@ in Hive deployments.
* Tables with buckets: bucket is the hash partitioning within a Hive table partition. Spark SQL
doesn't support buckets yet.
**Esoteric Hive Features**
* Tables with partitions using different input formats: In Spark SQL, all table partitions need to
have the same input format.
* Non-equi outer join: For the uncommon use case of using outer joins with non-equi join conditions
(e.g. condition "`key < 10`"), Spark SQL will output wrong result for the `NULL` tuple.
**Esoteric Hive Features**
* `UNION` type
* Unique join
* Single query multi insert
* Column statistics collecting: Spark SQL does not piggyback scans to collect column statistics at
the moment and only supports populating the sizeInBytes field of the hive metastore.
@ -1313,9 +1674,6 @@ less important due to Spark SQL's in-memory computational model. Others are slot
releases of Spark SQL.
* Block level bitmap indexes and virtual columns (used to build indexes)
* Automatically convert a join to map join: For joining a large table with multiple small tables,
Hive automatically converts the join into a map join. We are adding this auto conversion in the
next release.
* Automatically determine the number of reducers for joins and groupbys: Currently in Spark SQL, you
need to control the degree of parallelism post-shuffle using "`SET spark.sql.shuffle.partitions=[num_tasks];`".
* Meta-data only query: For queries that can be answered by using only meta data, Spark SQL still