987f386588
## What changes were proposed in this pull request? 1. Split the main page of sql-programming-guide into 7 parts: - Getting Started - Data Sources - Performance Turing - Distributed SQL Engine - PySpark Usage Guide for Pandas with Apache Arrow - Migration Guide - Reference 2. Add left menu for sql-programming-guide, keep first level index for each part in the menu. ![image](https://user-images.githubusercontent.com/4833765/47016859-6332e180-d183-11e8-92e8-ce62518a83c4.png) ## How was this patch tested? Local test with jekyll build/serve. Closes #22746 from xuanyuanking/SPARK-24499. Authored-by: Yuanjian Li <xyliyuanjian@gmail.com> Signed-off-by: gatorsmile <gatorsmile@gmail.com>
322 lines
11 KiB
Markdown
322 lines
11 KiB
Markdown
---
|
||
layout: global
|
||
title: Parquet Files
|
||
displayTitle: Parquet Files
|
||
---
|
||
|
||
* Table of contents
|
||
{:toc}
|
||
|
||
[Parquet](http://parquet.io) is a columnar format that is supported by many other data processing systems.
|
||
Spark SQL provides support for both reading and writing Parquet files that automatically preserves the schema
|
||
of the original data. When writing Parquet files, all columns are automatically converted to be nullable for
|
||
compatibility reasons.
|
||
|
||
### Loading Data Programmatically
|
||
|
||
Using the data from the above example:
|
||
|
||
<div class="codetabs">
|
||
|
||
<div data-lang="scala" markdown="1">
|
||
{% include_example basic_parquet_example scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %}
|
||
</div>
|
||
|
||
<div data-lang="java" markdown="1">
|
||
{% include_example basic_parquet_example java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %}
|
||
</div>
|
||
|
||
<div data-lang="python" markdown="1">
|
||
|
||
{% include_example basic_parquet_example python/sql/datasource.py %}
|
||
</div>
|
||
|
||
<div data-lang="r" markdown="1">
|
||
|
||
{% include_example basic_parquet_example r/RSparkSQLExample.R %}
|
||
|
||
</div>
|
||
|
||
<div data-lang="sql" markdown="1">
|
||
|
||
{% highlight sql %}
|
||
|
||
CREATE TEMPORARY VIEW parquetTable
|
||
USING org.apache.spark.sql.parquet
|
||
OPTIONS (
|
||
path "examples/src/main/resources/people.parquet"
|
||
)
|
||
|
||
SELECT * FROM parquetTable
|
||
|
||
{% endhighlight %}
|
||
|
||
</div>
|
||
|
||
</div>
|
||
|
||
### Partition Discovery
|
||
|
||
Table partitioning is a common optimization approach used in systems like Hive. In a partitioned
|
||
table, data are usually stored in different directories, with partitioning column values encoded in
|
||
the path of each partition directory. All built-in file sources (including Text/CSV/JSON/ORC/Parquet)
|
||
are able to discover and infer partitioning information automatically.
|
||
For example, we can store all our previously used
|
||
population data into a partitioned table using the following directory structure, with two extra
|
||
columns, `gender` and `country` as partitioning columns:
|
||
|
||
{% highlight text %}
|
||
|
||
path
|
||
└── to
|
||
└── table
|
||
├── gender=male
|
||
│ ├── ...
|
||
│ │
|
||
│ ├── country=US
|
||
│ │ └── data.parquet
|
||
│ ├── country=CN
|
||
│ │ └── data.parquet
|
||
│ └── ...
|
||
└── gender=female
|
||
├── ...
|
||
│
|
||
├── country=US
|
||
│ └── data.parquet
|
||
├── country=CN
|
||
│ └── data.parquet
|
||
└── ...
|
||
|
||
{% endhighlight %}
|
||
|
||
By passing `path/to/table` to either `SparkSession.read.parquet` or `SparkSession.read.load`, Spark SQL
|
||
will automatically extract the partitioning information from the paths.
|
||
Now the schema of the returned DataFrame becomes:
|
||
|
||
{% highlight text %}
|
||
|
||
root
|
||
|-- name: string (nullable = true)
|
||
|-- age: long (nullable = true)
|
||
|-- gender: string (nullable = true)
|
||
|-- country: string (nullable = true)
|
||
|
||
{% endhighlight %}
|
||
|
||
Notice that the data types of the partitioning columns are automatically inferred. Currently,
|
||
numeric data types, date, timestamp and string type are supported. Sometimes users may not want
|
||
to automatically infer the data types of the partitioning columns. For these use cases, the
|
||
automatic type inference can be configured by
|
||
`spark.sql.sources.partitionColumnTypeInference.enabled`, which is default to `true`. When type
|
||
inference is disabled, string type will be used for the partitioning columns.
|
||
|
||
Starting from Spark 1.6.0, partition discovery only finds partitions under the given paths
|
||
by default. For the above example, if users pass `path/to/table/gender=male` to either
|
||
`SparkSession.read.parquet` or `SparkSession.read.load`, `gender` will not be considered as a
|
||
partitioning column. If users need to specify the base path that partition discovery
|
||
should start with, they can set `basePath` in the data source options. For example,
|
||
when `path/to/table/gender=male` is the path of the data and
|
||
users set `basePath` to `path/to/table/`, `gender` will be a partitioning column.
|
||
|
||
### Schema Merging
|
||
|
||
Like Protocol Buffer, Avro, and Thrift, Parquet also supports schema evolution. Users can start with
|
||
a simple schema, and gradually add more columns to the schema as needed. In this way, users may end
|
||
up with multiple Parquet files with different but mutually compatible schemas. The Parquet data
|
||
source is now able to automatically detect this case and merge schemas of all these files.
|
||
|
||
Since schema merging is a relatively expensive operation, and is not a necessity in most cases, we
|
||
turned it off by default starting from 1.5.0. You may enable it by
|
||
|
||
1. setting data source option `mergeSchema` to `true` when reading Parquet files (as shown in the
|
||
examples below), or
|
||
2. setting the global SQL option `spark.sql.parquet.mergeSchema` to `true`.
|
||
|
||
<div class="codetabs">
|
||
|
||
<div data-lang="scala" markdown="1">
|
||
{% include_example schema_merging scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %}
|
||
</div>
|
||
|
||
<div data-lang="java" markdown="1">
|
||
{% include_example schema_merging java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %}
|
||
</div>
|
||
|
||
<div data-lang="python" markdown="1">
|
||
|
||
{% include_example schema_merging python/sql/datasource.py %}
|
||
</div>
|
||
|
||
<div data-lang="r" markdown="1">
|
||
|
||
{% include_example schema_merging r/RSparkSQLExample.R %}
|
||
|
||
</div>
|
||
|
||
</div>
|
||
|
||
### Hive metastore Parquet table conversion
|
||
|
||
When reading from and writing to Hive metastore Parquet tables, Spark SQL will try to use its own
|
||
Parquet support instead of Hive SerDe for better performance. This behavior is controlled by the
|
||
`spark.sql.hive.convertMetastoreParquet` configuration, and is turned on by default.
|
||
|
||
#### Hive/Parquet Schema Reconciliation
|
||
|
||
There are two key differences between Hive and Parquet from the perspective of table schema
|
||
processing.
|
||
|
||
1. Hive is case insensitive, while Parquet is not
|
||
1. Hive considers all columns nullable, while nullability in Parquet is significant
|
||
|
||
Due to this reason, we must reconcile Hive metastore schema with Parquet schema when converting a
|
||
Hive metastore Parquet table to a Spark SQL Parquet table. The reconciliation rules are:
|
||
|
||
1. Fields that have the same name in both schema must have the same data type regardless of
|
||
nullability. The reconciled field should have the data type of the Parquet side, so that
|
||
nullability is respected.
|
||
|
||
1. The reconciled schema contains exactly those fields defined in Hive metastore schema.
|
||
|
||
- Any fields that only appear in the Parquet schema are dropped in the reconciled schema.
|
||
- Any fields that only appear in the Hive metastore schema are added as nullable field in the
|
||
reconciled schema.
|
||
|
||
#### Metadata Refreshing
|
||
|
||
Spark SQL caches Parquet metadata for better performance. When Hive metastore Parquet table
|
||
conversion is enabled, metadata of those converted tables are also cached. If these tables are
|
||
updated by Hive or other external tools, you need to refresh them manually to ensure consistent
|
||
metadata.
|
||
|
||
<div class="codetabs">
|
||
|
||
<div data-lang="scala" markdown="1">
|
||
|
||
{% highlight scala %}
|
||
// spark is an existing SparkSession
|
||
spark.catalog.refreshTable("my_table")
|
||
{% endhighlight %}
|
||
|
||
</div>
|
||
|
||
<div data-lang="java" markdown="1">
|
||
|
||
{% highlight java %}
|
||
// spark is an existing SparkSession
|
||
spark.catalog().refreshTable("my_table");
|
||
{% endhighlight %}
|
||
|
||
</div>
|
||
|
||
<div data-lang="python" markdown="1">
|
||
|
||
{% highlight python %}
|
||
# spark is an existing SparkSession
|
||
spark.catalog.refreshTable("my_table")
|
||
{% endhighlight %}
|
||
|
||
</div>
|
||
|
||
<div data-lang="r" markdown="1">
|
||
|
||
{% highlight r %}
|
||
refreshTable("my_table")
|
||
{% endhighlight %}
|
||
|
||
</div>
|
||
|
||
<div data-lang="sql" markdown="1">
|
||
|
||
{% highlight sql %}
|
||
REFRESH TABLE my_table;
|
||
{% endhighlight %}
|
||
|
||
</div>
|
||
|
||
</div>
|
||
|
||
### Configuration
|
||
|
||
Configuration of Parquet can be done using the `setConf` method on `SparkSession` or by running
|
||
`SET key=value` commands using SQL.
|
||
|
||
<table class="table">
|
||
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
|
||
<tr>
|
||
<td><code>spark.sql.parquet.binaryAsString</code></td>
|
||
<td>false</td>
|
||
<td>
|
||
Some other Parquet-producing systems, in particular Impala, Hive, and older versions of Spark SQL, do
|
||
not differentiate between binary data and strings when writing out the Parquet schema. This
|
||
flag tells Spark SQL to interpret binary data as a string to provide compatibility with these systems.
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td><code>spark.sql.parquet.int96AsTimestamp</code></td>
|
||
<td>true</td>
|
||
<td>
|
||
Some Parquet-producing systems, in particular Impala and Hive, store Timestamp into INT96. This
|
||
flag tells Spark SQL to interpret INT96 data as a timestamp to provide compatibility with these systems.
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td><code>spark.sql.parquet.compression.codec</code></td>
|
||
<td>snappy</td>
|
||
<td>
|
||
Sets the compression codec used when writing Parquet files. If either `compression` or
|
||
`parquet.compression` is specified in the table-specific options/properties, the precedence would be
|
||
`compression`, `parquet.compression`, `spark.sql.parquet.compression.codec`. Acceptable values include:
|
||
none, uncompressed, snappy, gzip, lzo, brotli, lz4, zstd.
|
||
Note that `zstd` requires `ZStandardCodec` to be installed before Hadoop 2.9.0, `brotli` requires
|
||
`BrotliCodec` to be installed.
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td><code>spark.sql.parquet.filterPushdown</code></td>
|
||
<td>true</td>
|
||
<td>Enables Parquet filter push-down optimization when set to true.</td>
|
||
</tr>
|
||
<tr>
|
||
<td><code>spark.sql.hive.convertMetastoreParquet</code></td>
|
||
<td>true</td>
|
||
<td>
|
||
When set to false, Spark SQL will use the Hive SerDe for parquet tables instead of the built in
|
||
support.
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td><code>spark.sql.parquet.mergeSchema</code></td>
|
||
<td>false</td>
|
||
<td>
|
||
<p>
|
||
When true, the Parquet data source merges schemas collected from all data files, otherwise the
|
||
schema is picked from the summary file or a random data file if no summary file is available.
|
||
</p>
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td><code>spark.sql.optimizer.metadataOnly</code></td>
|
||
<td>true</td>
|
||
<td>
|
||
<p>
|
||
When true, enable the metadata-only query optimization that use the table's metadata to
|
||
produce the partition columns instead of table scans. It applies when all the columns scanned
|
||
are partition columns and the query has an aggregate operator that satisfies distinct
|
||
semantics.
|
||
</p>
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td><code>spark.sql.parquet.writeLegacyFormat</code></td>
|
||
<td>false</td>
|
||
<td>
|
||
If true, data will be written in a way of Spark 1.4 and earlier. For example, decimal values
|
||
will be written in Apache Parquet's fixed-length byte array format, which other systems such as
|
||
Apache Hive and Apache Impala use. If false, the newer format in Parquet will be used. For
|
||
example, decimals will be written in int-based format. If Parquet output is intended for use
|
||
with systems that do not support this newer format, set to true.
|
||
</td>
|
||
</tr>
|
||
</table>
|