2018-10-18 14:59:06 -04:00
|
|
|
|
---
|
|
|
|
|
layout: global
|
|
|
|
|
title: Parquet Files
|
|
|
|
|
displayTitle: Parquet Files
|
2019-03-30 20:49:45 -04:00
|
|
|
|
license: |
|
|
|
|
|
Licensed to the Apache Software Foundation (ASF) under one or more
|
|
|
|
|
contributor license agreements. See the NOTICE file distributed with
|
|
|
|
|
this work for additional information regarding copyright ownership.
|
|
|
|
|
The ASF licenses this file to You under the Apache License, Version 2.0
|
|
|
|
|
(the "License"); you may not use this file except in compliance with
|
|
|
|
|
the License. You may obtain a copy of the License at
|
|
|
|
|
|
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
|
|
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
|
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
|
See the License for the specific language governing permissions and
|
|
|
|
|
limitations under the License.
|
2018-10-18 14:59:06 -04:00
|
|
|
|
---
|
|
|
|
|
|
|
|
|
|
* Table of contents
|
|
|
|
|
{:toc}
|
|
|
|
|
|
|
|
|
|
[Parquet](http://parquet.io) is a columnar format that is supported by many other data processing systems.
|
|
|
|
|
Spark SQL provides support for both reading and writing Parquet files that automatically preserves the schema
|
2018-12-07 15:14:43 -05:00
|
|
|
|
of the original data. When reading Parquet files, all columns are automatically converted to be nullable for
|
2018-10-18 14:59:06 -04:00
|
|
|
|
compatibility reasons.
|
|
|
|
|
|
|
|
|
|
### Loading Data Programmatically
|
|
|
|
|
|
|
|
|
|
Using the data from the above example:
|
|
|
|
|
|
|
|
|
|
<div class="codetabs">
|
|
|
|
|
|
|
|
|
|
<div data-lang="scala" markdown="1">
|
|
|
|
|
{% include_example basic_parquet_example scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %}
|
|
|
|
|
</div>
|
|
|
|
|
|
|
|
|
|
<div data-lang="java" markdown="1">
|
|
|
|
|
{% include_example basic_parquet_example java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %}
|
|
|
|
|
</div>
|
|
|
|
|
|
|
|
|
|
<div data-lang="python" markdown="1">
|
|
|
|
|
|
|
|
|
|
{% include_example basic_parquet_example python/sql/datasource.py %}
|
|
|
|
|
</div>
|
|
|
|
|
|
|
|
|
|
<div data-lang="r" markdown="1">
|
|
|
|
|
|
|
|
|
|
{% include_example basic_parquet_example r/RSparkSQLExample.R %}
|
|
|
|
|
|
|
|
|
|
</div>
|
|
|
|
|
|
|
|
|
|
<div data-lang="sql" markdown="1">
|
|
|
|
|
|
|
|
|
|
{% highlight sql %}
|
|
|
|
|
|
|
|
|
|
CREATE TEMPORARY VIEW parquetTable
|
|
|
|
|
USING org.apache.spark.sql.parquet
|
|
|
|
|
OPTIONS (
|
|
|
|
|
path "examples/src/main/resources/people.parquet"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
SELECT * FROM parquetTable
|
|
|
|
|
|
|
|
|
|
{% endhighlight %}
|
|
|
|
|
|
|
|
|
|
</div>
|
|
|
|
|
|
|
|
|
|
</div>
|
|
|
|
|
|
|
|
|
|
### Partition Discovery
|
|
|
|
|
|
|
|
|
|
Table partitioning is a common optimization approach used in systems like Hive. In a partitioned
|
|
|
|
|
table, data are usually stored in different directories, with partitioning column values encoded in
|
|
|
|
|
the path of each partition directory. All built-in file sources (including Text/CSV/JSON/ORC/Parquet)
|
|
|
|
|
are able to discover and infer partitioning information automatically.
|
|
|
|
|
For example, we can store all our previously used
|
|
|
|
|
population data into a partitioned table using the following directory structure, with two extra
|
|
|
|
|
columns, `gender` and `country` as partitioning columns:
|
|
|
|
|
|
|
|
|
|
{% highlight text %}
|
|
|
|
|
|
|
|
|
|
path
|
|
|
|
|
└── to
|
|
|
|
|
└── table
|
|
|
|
|
├── gender=male
|
|
|
|
|
│ ├── ...
|
|
|
|
|
│ │
|
|
|
|
|
│ ├── country=US
|
|
|
|
|
│ │ └── data.parquet
|
|
|
|
|
│ ├── country=CN
|
|
|
|
|
│ │ └── data.parquet
|
|
|
|
|
│ └── ...
|
|
|
|
|
└── gender=female
|
|
|
|
|
├── ...
|
|
|
|
|
│
|
|
|
|
|
├── country=US
|
|
|
|
|
│ └── data.parquet
|
|
|
|
|
├── country=CN
|
|
|
|
|
│ └── data.parquet
|
|
|
|
|
└── ...
|
|
|
|
|
|
|
|
|
|
{% endhighlight %}
|
|
|
|
|
|
|
|
|
|
By passing `path/to/table` to either `SparkSession.read.parquet` or `SparkSession.read.load`, Spark SQL
|
|
|
|
|
will automatically extract the partitioning information from the paths.
|
|
|
|
|
Now the schema of the returned DataFrame becomes:
|
|
|
|
|
|
|
|
|
|
{% highlight text %}
|
|
|
|
|
|
|
|
|
|
root
|
|
|
|
|
|-- name: string (nullable = true)
|
|
|
|
|
|-- age: long (nullable = true)
|
|
|
|
|
|-- gender: string (nullable = true)
|
|
|
|
|
|-- country: string (nullable = true)
|
|
|
|
|
|
|
|
|
|
{% endhighlight %}
|
|
|
|
|
|
|
|
|
|
Notice that the data types of the partitioning columns are automatically inferred. Currently,
|
|
|
|
|
numeric data types, date, timestamp and string type are supported. Sometimes users may not want
|
|
|
|
|
to automatically infer the data types of the partitioning columns. For these use cases, the
|
|
|
|
|
automatic type inference can be configured by
|
|
|
|
|
`spark.sql.sources.partitionColumnTypeInference.enabled`, which is default to `true`. When type
|
|
|
|
|
inference is disabled, string type will be used for the partitioning columns.
|
|
|
|
|
|
|
|
|
|
Starting from Spark 1.6.0, partition discovery only finds partitions under the given paths
|
|
|
|
|
by default. For the above example, if users pass `path/to/table/gender=male` to either
|
|
|
|
|
`SparkSession.read.parquet` or `SparkSession.read.load`, `gender` will not be considered as a
|
|
|
|
|
partitioning column. If users need to specify the base path that partition discovery
|
|
|
|
|
should start with, they can set `basePath` in the data source options. For example,
|
|
|
|
|
when `path/to/table/gender=male` is the path of the data and
|
|
|
|
|
users set `basePath` to `path/to/table/`, `gender` will be a partitioning column.
|
|
|
|
|
|
|
|
|
|
### Schema Merging
|
|
|
|
|
|
|
|
|
|
Like Protocol Buffer, Avro, and Thrift, Parquet also supports schema evolution. Users can start with
|
|
|
|
|
a simple schema, and gradually add more columns to the schema as needed. In this way, users may end
|
|
|
|
|
up with multiple Parquet files with different but mutually compatible schemas. The Parquet data
|
|
|
|
|
source is now able to automatically detect this case and merge schemas of all these files.
|
|
|
|
|
|
|
|
|
|
Since schema merging is a relatively expensive operation, and is not a necessity in most cases, we
|
|
|
|
|
turned it off by default starting from 1.5.0. You may enable it by
|
|
|
|
|
|
|
|
|
|
1. setting data source option `mergeSchema` to `true` when reading Parquet files (as shown in the
|
|
|
|
|
examples below), or
|
|
|
|
|
2. setting the global SQL option `spark.sql.parquet.mergeSchema` to `true`.
|
|
|
|
|
|
|
|
|
|
<div class="codetabs">
|
|
|
|
|
|
|
|
|
|
<div data-lang="scala" markdown="1">
|
|
|
|
|
{% include_example schema_merging scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %}
|
|
|
|
|
</div>
|
|
|
|
|
|
|
|
|
|
<div data-lang="java" markdown="1">
|
|
|
|
|
{% include_example schema_merging java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %}
|
|
|
|
|
</div>
|
|
|
|
|
|
|
|
|
|
<div data-lang="python" markdown="1">
|
|
|
|
|
|
|
|
|
|
{% include_example schema_merging python/sql/datasource.py %}
|
|
|
|
|
</div>
|
|
|
|
|
|
|
|
|
|
<div data-lang="r" markdown="1">
|
|
|
|
|
|
|
|
|
|
{% include_example schema_merging r/RSparkSQLExample.R %}
|
|
|
|
|
|
|
|
|
|
</div>
|
|
|
|
|
|
|
|
|
|
</div>
|
|
|
|
|
|
|
|
|
|
### Hive metastore Parquet table conversion
|
|
|
|
|
|
2019-02-01 19:34:13 -05:00
|
|
|
|
When reading from Hive metastore Parquet tables and writing to non-partitioned Hive metastore
|
|
|
|
|
Parquet tables, Spark SQL will try to use its own Parquet support instead of Hive SerDe for
|
|
|
|
|
better performance. This behavior is controlled by the `spark.sql.hive.convertMetastoreParquet`
|
|
|
|
|
configuration, and is turned on by default.
|
2018-10-18 14:59:06 -04:00
|
|
|
|
|
|
|
|
|
#### Hive/Parquet Schema Reconciliation
|
|
|
|
|
|
|
|
|
|
There are two key differences between Hive and Parquet from the perspective of table schema
|
|
|
|
|
processing.
|
|
|
|
|
|
|
|
|
|
1. Hive is case insensitive, while Parquet is not
|
|
|
|
|
1. Hive considers all columns nullable, while nullability in Parquet is significant
|
|
|
|
|
|
|
|
|
|
Due to this reason, we must reconcile Hive metastore schema with Parquet schema when converting a
|
|
|
|
|
Hive metastore Parquet table to a Spark SQL Parquet table. The reconciliation rules are:
|
|
|
|
|
|
|
|
|
|
1. Fields that have the same name in both schema must have the same data type regardless of
|
|
|
|
|
nullability. The reconciled field should have the data type of the Parquet side, so that
|
|
|
|
|
nullability is respected.
|
|
|
|
|
|
|
|
|
|
1. The reconciled schema contains exactly those fields defined in Hive metastore schema.
|
|
|
|
|
|
|
|
|
|
- Any fields that only appear in the Parquet schema are dropped in the reconciled schema.
|
|
|
|
|
- Any fields that only appear in the Hive metastore schema are added as nullable field in the
|
|
|
|
|
reconciled schema.
|
|
|
|
|
|
|
|
|
|
#### Metadata Refreshing
|
|
|
|
|
|
|
|
|
|
Spark SQL caches Parquet metadata for better performance. When Hive metastore Parquet table
|
|
|
|
|
conversion is enabled, metadata of those converted tables are also cached. If these tables are
|
|
|
|
|
updated by Hive or other external tools, you need to refresh them manually to ensure consistent
|
|
|
|
|
metadata.
|
|
|
|
|
|
|
|
|
|
<div class="codetabs">
|
|
|
|
|
|
|
|
|
|
<div data-lang="scala" markdown="1">
|
|
|
|
|
|
|
|
|
|
{% highlight scala %}
|
|
|
|
|
// spark is an existing SparkSession
|
|
|
|
|
spark.catalog.refreshTable("my_table")
|
|
|
|
|
{% endhighlight %}
|
|
|
|
|
|
|
|
|
|
</div>
|
|
|
|
|
|
|
|
|
|
<div data-lang="java" markdown="1">
|
|
|
|
|
|
|
|
|
|
{% highlight java %}
|
|
|
|
|
// spark is an existing SparkSession
|
|
|
|
|
spark.catalog().refreshTable("my_table");
|
|
|
|
|
{% endhighlight %}
|
|
|
|
|
|
|
|
|
|
</div>
|
|
|
|
|
|
|
|
|
|
<div data-lang="python" markdown="1">
|
|
|
|
|
|
|
|
|
|
{% highlight python %}
|
|
|
|
|
# spark is an existing SparkSession
|
|
|
|
|
spark.catalog.refreshTable("my_table")
|
|
|
|
|
{% endhighlight %}
|
|
|
|
|
|
|
|
|
|
</div>
|
|
|
|
|
|
|
|
|
|
<div data-lang="r" markdown="1">
|
|
|
|
|
|
|
|
|
|
{% highlight r %}
|
|
|
|
|
refreshTable("my_table")
|
|
|
|
|
{% endhighlight %}
|
|
|
|
|
|
|
|
|
|
</div>
|
|
|
|
|
|
|
|
|
|
<div data-lang="sql" markdown="1">
|
|
|
|
|
|
|
|
|
|
{% highlight sql %}
|
|
|
|
|
REFRESH TABLE my_table;
|
|
|
|
|
{% endhighlight %}
|
|
|
|
|
|
|
|
|
|
</div>
|
|
|
|
|
|
|
|
|
|
</div>
|
|
|
|
|
|
|
|
|
|
### Configuration
|
|
|
|
|
|
|
|
|
|
Configuration of Parquet can be done using the `setConf` method on `SparkSession` or by running
|
|
|
|
|
`SET key=value` commands using SQL.
|
|
|
|
|
|
|
|
|
|
<table class="table">
|
|
|
|
|
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
|
|
|
|
|
<tr>
|
|
|
|
|
<td><code>spark.sql.parquet.binaryAsString</code></td>
|
|
|
|
|
<td>false</td>
|
|
|
|
|
<td>
|
|
|
|
|
Some other Parquet-producing systems, in particular Impala, Hive, and older versions of Spark SQL, do
|
|
|
|
|
not differentiate between binary data and strings when writing out the Parquet schema. This
|
|
|
|
|
flag tells Spark SQL to interpret binary data as a string to provide compatibility with these systems.
|
|
|
|
|
</td>
|
|
|
|
|
</tr>
|
|
|
|
|
<tr>
|
|
|
|
|
<td><code>spark.sql.parquet.int96AsTimestamp</code></td>
|
|
|
|
|
<td>true</td>
|
|
|
|
|
<td>
|
|
|
|
|
Some Parquet-producing systems, in particular Impala and Hive, store Timestamp into INT96. This
|
|
|
|
|
flag tells Spark SQL to interpret INT96 data as a timestamp to provide compatibility with these systems.
|
|
|
|
|
</td>
|
|
|
|
|
</tr>
|
|
|
|
|
<tr>
|
|
|
|
|
<td><code>spark.sql.parquet.compression.codec</code></td>
|
|
|
|
|
<td>snappy</td>
|
|
|
|
|
<td>
|
|
|
|
|
Sets the compression codec used when writing Parquet files. If either `compression` or
|
|
|
|
|
`parquet.compression` is specified in the table-specific options/properties, the precedence would be
|
|
|
|
|
`compression`, `parquet.compression`, `spark.sql.parquet.compression.codec`. Acceptable values include:
|
|
|
|
|
none, uncompressed, snappy, gzip, lzo, brotli, lz4, zstd.
|
|
|
|
|
Note that `zstd` requires `ZStandardCodec` to be installed before Hadoop 2.9.0, `brotli` requires
|
|
|
|
|
`BrotliCodec` to be installed.
|
|
|
|
|
</td>
|
|
|
|
|
</tr>
|
|
|
|
|
<tr>
|
|
|
|
|
<td><code>spark.sql.parquet.filterPushdown</code></td>
|
|
|
|
|
<td>true</td>
|
|
|
|
|
<td>Enables Parquet filter push-down optimization when set to true.</td>
|
|
|
|
|
</tr>
|
|
|
|
|
<tr>
|
|
|
|
|
<td><code>spark.sql.hive.convertMetastoreParquet</code></td>
|
|
|
|
|
<td>true</td>
|
|
|
|
|
<td>
|
|
|
|
|
When set to false, Spark SQL will use the Hive SerDe for parquet tables instead of the built in
|
|
|
|
|
support.
|
|
|
|
|
</td>
|
|
|
|
|
</tr>
|
|
|
|
|
<tr>
|
|
|
|
|
<td><code>spark.sql.parquet.mergeSchema</code></td>
|
|
|
|
|
<td>false</td>
|
|
|
|
|
<td>
|
|
|
|
|
<p>
|
|
|
|
|
When true, the Parquet data source merges schemas collected from all data files, otherwise the
|
|
|
|
|
schema is picked from the summary file or a random data file if no summary file is available.
|
|
|
|
|
</p>
|
|
|
|
|
</td>
|
|
|
|
|
</tr>
|
|
|
|
|
<tr>
|
|
|
|
|
<td><code>spark.sql.parquet.writeLegacyFormat</code></td>
|
|
|
|
|
<td>false</td>
|
|
|
|
|
<td>
|
|
|
|
|
If true, data will be written in a way of Spark 1.4 and earlier. For example, decimal values
|
|
|
|
|
will be written in Apache Parquet's fixed-length byte array format, which other systems such as
|
|
|
|
|
Apache Hive and Apache Impala use. If false, the newer format in Parquet will be used. For
|
|
|
|
|
example, decimals will be written in int-based format. If Parquet output is intended for use
|
|
|
|
|
with systems that do not support this newer format, set to true.
|
|
|
|
|
</td>
|
|
|
|
|
</tr>
|
|
|
|
|
</table>
|