b58f0976a9
### What changes were proposed in this pull request? In the PR, I propose to update the Spark SQL guide about the SQL configs that are related to datetime rebasing: - spark.sql.parquet.int96RebaseModeInWrite - spark.sql.parquet.datetimeRebaseModeInWrite - spark.sql.parquet.int96RebaseModeInRead - spark.sql.parquet.datetimeRebaseModeInRead - spark.sql.avro.datetimeRebaseModeInWrite - spark.sql.avro.datetimeRebaseModeInRead Parquet options added by #31489: - datetimeRebaseMode - int96RebaseMode and Avro options added by #31529: - datetimeRebaseMode <img width="998" alt="Screenshot 2021-02-17 at 21 42 09" src="https://user-images.githubusercontent.com/1580697/108252043-3afb8900-7169-11eb-8568-511e21fa7f78.png"> ### Why are the changes needed? To inform users about supported DS options and SQL configs. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? By generating the doc and manually checking: ``` $ SKIP_API=1 SKIP_SCALADOC=1 SKIP_PYTHONDOC=1 SKIP_RDOC=1 jekyll serve --watch ``` Closes #31564 from MaxGekk/doc-rebase-options. Authored-by: Max Gekk <max.gekk@gmail.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
419 lines
16 KiB
Markdown
419 lines
16 KiB
Markdown
---
|
||
layout: global
|
||
title: Parquet Files
|
||
displayTitle: Parquet Files
|
||
license: |
|
||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||
contributor license agreements. See the NOTICE file distributed with
|
||
this work for additional information regarding copyright ownership.
|
||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||
(the "License"); you may not use this file except in compliance with
|
||
the License. You may obtain a copy of the License at
|
||
|
||
http://www.apache.org/licenses/LICENSE-2.0
|
||
|
||
Unless required by applicable law or agreed to in writing, software
|
||
distributed under the License is distributed on an "AS IS" BASIS,
|
||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
See the License for the specific language governing permissions and
|
||
limitations under the License.
|
||
---
|
||
|
||
* Table of contents
|
||
{:toc}
|
||
|
||
[Parquet](https://parquet.apache.org) is a columnar format that is supported by many other data processing systems.
|
||
Spark SQL provides support for both reading and writing Parquet files that automatically preserves the schema
|
||
of the original data. When reading Parquet files, all columns are automatically converted to be nullable for
|
||
compatibility reasons.
|
||
|
||
### Loading Data Programmatically
|
||
|
||
Using the data from the above example:
|
||
|
||
<div class="codetabs">
|
||
|
||
<div data-lang="scala" markdown="1">
|
||
{% include_example basic_parquet_example scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %}
|
||
</div>
|
||
|
||
<div data-lang="java" markdown="1">
|
||
{% include_example basic_parquet_example java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %}
|
||
</div>
|
||
|
||
<div data-lang="python" markdown="1">
|
||
|
||
{% include_example basic_parquet_example python/sql/datasource.py %}
|
||
</div>
|
||
|
||
<div data-lang="r" markdown="1">
|
||
|
||
{% include_example basic_parquet_example r/RSparkSQLExample.R %}
|
||
|
||
</div>
|
||
|
||
<div data-lang="SQL" markdown="1">
|
||
|
||
{% highlight sql %}
|
||
|
||
CREATE TEMPORARY VIEW parquetTable
|
||
USING org.apache.spark.sql.parquet
|
||
OPTIONS (
|
||
path "examples/src/main/resources/people.parquet"
|
||
)
|
||
|
||
SELECT * FROM parquetTable
|
||
|
||
{% endhighlight %}
|
||
|
||
</div>
|
||
|
||
</div>
|
||
|
||
### Partition Discovery
|
||
|
||
Table partitioning is a common optimization approach used in systems like Hive. In a partitioned
|
||
table, data are usually stored in different directories, with partitioning column values encoded in
|
||
the path of each partition directory. All built-in file sources (including Text/CSV/JSON/ORC/Parquet)
|
||
are able to discover and infer partitioning information automatically.
|
||
For example, we can store all our previously used
|
||
population data into a partitioned table using the following directory structure, with two extra
|
||
columns, `gender` and `country` as partitioning columns:
|
||
|
||
{% highlight text %}
|
||
|
||
path
|
||
└── to
|
||
└── table
|
||
├── gender=male
|
||
│ ├── ...
|
||
│ │
|
||
│ ├── country=US
|
||
│ │ └── data.parquet
|
||
│ ├── country=CN
|
||
│ │ └── data.parquet
|
||
│ └── ...
|
||
└── gender=female
|
||
├── ...
|
||
│
|
||
├── country=US
|
||
│ └── data.parquet
|
||
├── country=CN
|
||
│ └── data.parquet
|
||
└── ...
|
||
|
||
{% endhighlight %}
|
||
|
||
By passing `path/to/table` to either `SparkSession.read.parquet` or `SparkSession.read.load`, Spark SQL
|
||
will automatically extract the partitioning information from the paths.
|
||
Now the schema of the returned DataFrame becomes:
|
||
|
||
{% highlight text %}
|
||
|
||
root
|
||
|-- name: string (nullable = true)
|
||
|-- age: long (nullable = true)
|
||
|-- gender: string (nullable = true)
|
||
|-- country: string (nullable = true)
|
||
|
||
{% endhighlight %}
|
||
|
||
Notice that the data types of the partitioning columns are automatically inferred. Currently,
|
||
numeric data types, date, timestamp and string type are supported. Sometimes users may not want
|
||
to automatically infer the data types of the partitioning columns. For these use cases, the
|
||
automatic type inference can be configured by
|
||
`spark.sql.sources.partitionColumnTypeInference.enabled`, which is default to `true`. When type
|
||
inference is disabled, string type will be used for the partitioning columns.
|
||
|
||
Starting from Spark 1.6.0, partition discovery only finds partitions under the given paths
|
||
by default. For the above example, if users pass `path/to/table/gender=male` to either
|
||
`SparkSession.read.parquet` or `SparkSession.read.load`, `gender` will not be considered as a
|
||
partitioning column. If users need to specify the base path that partition discovery
|
||
should start with, they can set `basePath` in the data source options. For example,
|
||
when `path/to/table/gender=male` is the path of the data and
|
||
users set `basePath` to `path/to/table/`, `gender` will be a partitioning column.
|
||
|
||
### Schema Merging
|
||
|
||
Like Protocol Buffer, Avro, and Thrift, Parquet also supports schema evolution. Users can start with
|
||
a simple schema, and gradually add more columns to the schema as needed. In this way, users may end
|
||
up with multiple Parquet files with different but mutually compatible schemas. The Parquet data
|
||
source is now able to automatically detect this case and merge schemas of all these files.
|
||
|
||
Since schema merging is a relatively expensive operation, and is not a necessity in most cases, we
|
||
turned it off by default starting from 1.5.0. You may enable it by
|
||
|
||
1. setting data source option `mergeSchema` to `true` when reading Parquet files (as shown in the
|
||
examples below), or
|
||
2. setting the global SQL option `spark.sql.parquet.mergeSchema` to `true`.
|
||
|
||
<div class="codetabs">
|
||
|
||
<div data-lang="scala" markdown="1">
|
||
{% include_example schema_merging scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %}
|
||
</div>
|
||
|
||
<div data-lang="java" markdown="1">
|
||
{% include_example schema_merging java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %}
|
||
</div>
|
||
|
||
<div data-lang="python" markdown="1">
|
||
|
||
{% include_example schema_merging python/sql/datasource.py %}
|
||
</div>
|
||
|
||
<div data-lang="r" markdown="1">
|
||
|
||
{% include_example schema_merging r/RSparkSQLExample.R %}
|
||
|
||
</div>
|
||
|
||
</div>
|
||
|
||
### Hive metastore Parquet table conversion
|
||
|
||
When reading from Hive metastore Parquet tables and writing to non-partitioned Hive metastore
|
||
Parquet tables, Spark SQL will try to use its own Parquet support instead of Hive SerDe for
|
||
better performance. This behavior is controlled by the `spark.sql.hive.convertMetastoreParquet`
|
||
configuration, and is turned on by default.
|
||
|
||
#### Hive/Parquet Schema Reconciliation
|
||
|
||
There are two key differences between Hive and Parquet from the perspective of table schema
|
||
processing.
|
||
|
||
1. Hive is case insensitive, while Parquet is not
|
||
1. Hive considers all columns nullable, while nullability in Parquet is significant
|
||
|
||
Due to this reason, we must reconcile Hive metastore schema with Parquet schema when converting a
|
||
Hive metastore Parquet table to a Spark SQL Parquet table. The reconciliation rules are:
|
||
|
||
1. Fields that have the same name in both schema must have the same data type regardless of
|
||
nullability. The reconciled field should have the data type of the Parquet side, so that
|
||
nullability is respected.
|
||
|
||
1. The reconciled schema contains exactly those fields defined in Hive metastore schema.
|
||
|
||
- Any fields that only appear in the Parquet schema are dropped in the reconciled schema.
|
||
- Any fields that only appear in the Hive metastore schema are added as nullable field in the
|
||
reconciled schema.
|
||
|
||
#### Metadata Refreshing
|
||
|
||
Spark SQL caches Parquet metadata for better performance. When Hive metastore Parquet table
|
||
conversion is enabled, metadata of those converted tables are also cached. If these tables are
|
||
updated by Hive or other external tools, you need to refresh them manually to ensure consistent
|
||
metadata.
|
||
|
||
<div class="codetabs">
|
||
|
||
<div data-lang="scala" markdown="1">
|
||
|
||
{% highlight scala %}
|
||
// spark is an existing SparkSession
|
||
spark.catalog.refreshTable("my_table")
|
||
{% endhighlight %}
|
||
|
||
</div>
|
||
|
||
<div data-lang="java" markdown="1">
|
||
|
||
{% highlight java %}
|
||
// spark is an existing SparkSession
|
||
spark.catalog().refreshTable("my_table");
|
||
{% endhighlight %}
|
||
|
||
</div>
|
||
|
||
<div data-lang="python" markdown="1">
|
||
|
||
{% highlight python %}
|
||
# spark is an existing SparkSession
|
||
spark.catalog.refreshTable("my_table")
|
||
{% endhighlight %}
|
||
|
||
</div>
|
||
|
||
<div data-lang="r" markdown="1">
|
||
|
||
{% highlight r %}
|
||
refreshTable("my_table")
|
||
{% endhighlight %}
|
||
|
||
</div>
|
||
|
||
<div data-lang="SQL" markdown="1">
|
||
|
||
{% highlight sql %}
|
||
REFRESH TABLE my_table;
|
||
{% endhighlight %}
|
||
|
||
</div>
|
||
|
||
</div>
|
||
|
||
## Data Source Option
|
||
|
||
Data source options of Parquet can be set via:
|
||
* the `.option`/`.options` methods of `DataFrameReader` or `DataFrameWriter`
|
||
* the `.option`/`.options` methods of `DataStreamReader` or `DataStreamWriter`
|
||
|
||
<table class="table">
|
||
<tr><th><b>Property Name</b></th><th><b>Default</b></th><th><b>Meaning</b></th><th><b>Scope</b></th></tr>
|
||
<tr>
|
||
<td><code>datetimeRebaseMode</code></td>
|
||
<td>The SQL config <code>spark.sql.parquet</code> <code>.datetimeRebaseModeInRead</code> which is <code>EXCEPTION</code> by default</td>
|
||
<td>The <code>datetimeRebaseMode</code> option allows to specify the rebasing mode for the values of the <code>DATE</code>, <code>TIMESTAMP_MILLIS</code>, <code>TIMESTAMP_MICROS</code> logical types from the Julian to Proleptic Gregorian calendar.<br>
|
||
Currently supported modes are:
|
||
<ul>
|
||
<li><code>EXCEPTION</code>: fails in reads of ancient dates/timestamps that are ambiguous between the two calendars.</li>
|
||
<li><code>CORRECTED</code>: loads dates/timestamps without rebasing.</li>
|
||
<li><code>LEGACY</code>: performs rebasing of ancient dates/timestamps from the Julian to Proleptic Gregorian calendar.</li>
|
||
</ul>
|
||
</td>
|
||
<td>read</td>
|
||
</tr>
|
||
<tr>
|
||
<td><code>int96RebaseMode</code></td>
|
||
<td>The SQL config <code>spark.sql.parquet</code> <code>.int96RebaseModeInRead</code> which is <code>EXCEPTION</code> by default</td>
|
||
<td>The <code>int96RebaseMode</code> option allows to specify the rebasing mode for INT96 timestamps from the Julian to Proleptic Gregorian calendar.<br>
|
||
Currently supported modes are:
|
||
<ul>
|
||
<li><code>EXCEPTION</code>: fails in reads of ancient INT96 timestamps that are ambiguous between the two calendars.</li>
|
||
<li><code>CORRECTED</code>: loads INT96 timestamps without rebasing.</li>
|
||
<li><code>LEGACY</code>: performs rebasing of ancient timestamps from the Julian to Proleptic Gregorian calendar.</li>
|
||
</ul>
|
||
</td>
|
||
<td>read</td>
|
||
</tr>
|
||
</table>
|
||
|
||
### Configuration
|
||
|
||
Configuration of Parquet can be done using the `setConf` method on `SparkSession` or by running
|
||
`SET key=value` commands using SQL.
|
||
|
||
<table class="table">
|
||
<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr>
|
||
<tr>
|
||
<td><code>spark.sql.parquet.binaryAsString</code></td>
|
||
<td>false</td>
|
||
<td>
|
||
Some other Parquet-producing systems, in particular Impala, Hive, and older versions of Spark SQL, do
|
||
not differentiate between binary data and strings when writing out the Parquet schema. This
|
||
flag tells Spark SQL to interpret binary data as a string to provide compatibility with these systems.
|
||
</td>
|
||
<td>1.1.1</td>
|
||
</tr>
|
||
<tr>
|
||
<td><code>spark.sql.parquet.int96AsTimestamp</code></td>
|
||
<td>true</td>
|
||
<td>
|
||
Some Parquet-producing systems, in particular Impala and Hive, store Timestamp into INT96. This
|
||
flag tells Spark SQL to interpret INT96 data as a timestamp to provide compatibility with these systems.
|
||
</td>
|
||
<td>1.3.0</td>
|
||
</tr>
|
||
<tr>
|
||
<td><code>spark.sql.parquet.compression.codec</code></td>
|
||
<td>snappy</td>
|
||
<td>
|
||
Sets the compression codec used when writing Parquet files. If either <code>compression</code> or
|
||
<code>parquet.compression</code> is specified in the table-specific options/properties, the precedence would be
|
||
<code>compression</code>, <code>parquet.compression</code>, <code>spark.sql.parquet.compression.codec</code>. Acceptable values include:
|
||
none, uncompressed, snappy, gzip, lzo, brotli, lz4, zstd.
|
||
Note that <code>zstd</code> requires <code>ZStandardCodec</code> to be installed before Hadoop 2.9.0, <code>brotli</code> requires
|
||
<code>BrotliCodec</code> to be installed.
|
||
</td>
|
||
<td>1.1.1</td>
|
||
</tr>
|
||
<tr>
|
||
<td><code>spark.sql.parquet.filterPushdown</code></td>
|
||
<td>true</td>
|
||
<td>Enables Parquet filter push-down optimization when set to true.</td>
|
||
<td>1.2.0</td>
|
||
</tr>
|
||
<tr>
|
||
<td><code>spark.sql.hive.convertMetastoreParquet</code></td>
|
||
<td>true</td>
|
||
<td>
|
||
When set to false, Spark SQL will use the Hive SerDe for parquet tables instead of the built in
|
||
support.
|
||
</td>
|
||
<td>1.1.1</td>
|
||
</tr>
|
||
<tr>
|
||
<td><code>spark.sql.parquet.mergeSchema</code></td>
|
||
<td>false</td>
|
||
<td>
|
||
<p>
|
||
When true, the Parquet data source merges schemas collected from all data files, otherwise the
|
||
schema is picked from the summary file or a random data file if no summary file is available.
|
||
</p>
|
||
</td>
|
||
<td>1.5.0</td>
|
||
</tr>
|
||
<tr>
|
||
<td><code>spark.sql.parquet.writeLegacyFormat</code></td>
|
||
<td>false</td>
|
||
<td>
|
||
If true, data will be written in a way of Spark 1.4 and earlier. For example, decimal values
|
||
will be written in Apache Parquet's fixed-length byte array format, which other systems such as
|
||
Apache Hive and Apache Impala use. If false, the newer format in Parquet will be used. For
|
||
example, decimals will be written in int-based format. If Parquet output is intended for use
|
||
with systems that do not support this newer format, set to true.
|
||
</td>
|
||
<td>1.6.0</td>
|
||
</tr>
|
||
<tr>
|
||
<td>spark.sql.parquet.datetimeRebaseModeInRead</td>
|
||
<td><code>EXCEPTION</code></td>
|
||
<td>The rebasing mode for the values of the <code>DATE</code>, <code>TIMESTAMP_MILLIS</code>, <code>TIMESTAMP_MICROS</code> logical types from the Julian to Proleptic Gregorian calendar:<br>
|
||
<ul>
|
||
<li><code>EXCEPTION</code>: Spark will fail the reading if it sees ancient dates/timestamps that are ambiguous between the two calendars.</li>
|
||
<li><code>CORRECTED</code>: Spark will not do rebase and read the dates/timestamps as it is.</li>
|
||
<li><code>LEGACY</code>: Spark will rebase dates/timestamps from the legacy hybrid (Julian + Gregorian) calendar to Proleptic Gregorian calendar when reading Parquet files.</li>
|
||
</ul>
|
||
This config is only effective if the writer info (like Spark, Hive) of the Parquet files is unknown.
|
||
</td>
|
||
<td>3.0.0</td>
|
||
</tr>
|
||
<tr>
|
||
<td>spark.sql.parquet.datetimeRebaseModeInWrite</td>
|
||
<td><code>EXCEPTION</code></td>
|
||
<td>The rebasing mode for the values of the <code>DATE</code>, <code>TIMESTAMP_MILLIS</code>, <code>TIMESTAMP_MICROS</code> logical types from the Proleptic Gregorian to Julian calendar:<br>
|
||
<ul>
|
||
<li><code>EXCEPTION</code>: Spark will fail the writing if it sees ancient dates/timestamps that are ambiguous between the two calendars.</li>
|
||
<li><code>CORRECTED</code>: Spark will not do rebase and write the dates/timestamps as it is.</li>
|
||
<li><code>LEGACY</code>: Spark will rebase dates/timestamps from Proleptic Gregorian calendar to the legacy hybrid (Julian + Gregorian) calendar when writing Parquet files.</li>
|
||
</ul>
|
||
</td>
|
||
<td>3.0.0</td>
|
||
</tr>
|
||
<tr>
|
||
<td>spark.sql.parquet.int96RebaseModeInRead</td>
|
||
<td><code>EXCEPTION</code></td>
|
||
<td>The rebasing mode for the values of the <code>INT96</code> timestamp type from the Julian to Proleptic Gregorian calendar:<br>
|
||
<ul>
|
||
<li><code>EXCEPTION</code>: Spark will fail the reading if it sees ancient INT96 timestamps that are ambiguous between the two calendars.</li>
|
||
<li><code>CORRECTED</code>: Spark will not do rebase and read the dates/timestamps as it is.</li>
|
||
<li><code>LEGACY</code>: Spark will rebase INT96 timestamps from the legacy hybrid (Julian + Gregorian) calendar to Proleptic Gregorian calendar when reading Parquet files.</li>
|
||
</ul>
|
||
This config is only effective if the writer info (like Spark, Hive) of the Parquet files is unknown.
|
||
</td>
|
||
<td>3.1.0</td>
|
||
</tr>
|
||
<tr>
|
||
<td>spark.sql.parquet.int96RebaseModeInWrite</td>
|
||
<td><code>EXCEPTION</code></td>
|
||
<td>The rebasing mode for the values of the <code>INT96</code> timestamp type from the Proleptic Gregorian to Julian calendar:<br>
|
||
<ul>
|
||
<li><code>EXCEPTION</code>: Spark will fail the writing if it sees ancient timestamps that are ambiguous between the two calendars.</li>
|
||
<li><code>CORRECTED</code>: Spark will not do rebase and write the dates/timestamps as it is.</li>
|
||
<li><code>LEGACY</code>: Spark will rebase INT96 timestamps from Proleptic Gregorian calendar to the legacy hybrid (Julian + Gregorian) calendar when writing Parquet files.</li>
|
||
</ul>
|
||
</td>
|
||
<td>3.1.0</td>
|
||
</tr>
|
||
</table>
|