spark-instrumented-optimizer/docs/sql-data-sources-generic-options.md
CC Highman d338af3101 [SPARK-31962][SQL] Provide modifiedAfter and modifiedBefore options when filtering from a batch-based file data source
### What changes were proposed in this pull request?

Two new options, _modifiiedBefore_  and _modifiedAfter_, is provided expecting a value in 'YYYY-MM-DDTHH:mm:ss' format.  _PartioningAwareFileIndex_ considers these options during the process of checking for files, just before considering applied _PathFilters_ such as `pathGlobFilter.`  In order to filter file results, a new PathFilter class was derived for this purpose.  General house-keeping around classes extending PathFilter was performed for neatness.  It became apparent support was needed to handle multiple potential path filters.  Logic was introduced for this purpose and the associated tests written.

### Why are the changes needed?

When loading files from a data source, there can often times be thousands of file within a respective file path.  In many cases I've seen, we want to start loading from a folder path and ideally be able to begin loading files having modification dates past a certain point.  This would mean out of thousands of potential files, only the ones with modification dates greater than the specified timestamp would be considered.  This saves a ton of time automatically and reduces significant complexity managing this in code.

### Does this PR introduce _any_ user-facing change?

This PR introduces an option that can be used with batch-based Spark file data sources.  A documentation update was made to reflect an example and usage of the new data source option.

**Example Usages**
_Load all CSV files modified after date:_
`spark.read.format("csv").option("modifiedAfter","2020-06-15T05:00:00").load()`

_Load all CSV files modified before date:_
`spark.read.format("csv").option("modifiedBefore","2020-06-15T05:00:00").load()`

_Load all CSV files modified between two dates:_
`spark.read.format("csv").option("modifiedAfter","2019-01-15T05:00:00").option("modifiedBefore","2020-06-15T05:00:00").load()
`

### How was this patch tested?

A handful of unit tests were added to support the positive, negative, and edge case code paths.

It's also live in a handful of our Databricks dev environments.  (quoted from cchighman)

Closes #30411 from HeartSaVioR/SPARK-31962.

Lead-authored-by: CC Highman <christopher.highman@microsoft.com>
Co-authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
2020-11-23 08:30:41 +09:00

158 lines
6 KiB
Markdown

---
layout: global
title: Generic File Source Options
displayTitle: Generic File Source Options
license: |
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
---
* Table of contents
{:toc}
These generic options/configurations are effective only when using file-based sources: parquet, orc, avro, json, csv, text.
Please note that the hierarchy of directories used in examples below are:
{% highlight text %}
dir1/
├── dir2/
│ └── file2.parquet (schema: <file: string>, content: "file2.parquet")
└── file1.parquet (schema: <file, string>, content: "file1.parquet")
└── file3.json (schema: <file, string>, content: "{'file':'corrupt.json'}")
{% endhighlight %}
### Ignore Corrupt Files
Spark allows you to use `spark.sql.files.ignoreCorruptFiles` to ignore corrupt files while reading data
from files. When set to true, the Spark jobs will continue to run when encountering corrupted files and
the contents that have been read will still be returned.
To ignore corrupt files while reading data files, you can use:
<div class="codetabs">
<div data-lang="scala" markdown="1">
{% include_example ignore_corrupt_files scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %}
</div>
<div data-lang="java" markdown="1">
{% include_example ignore_corrupt_files java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %}
</div>
<div data-lang="python" markdown="1">
{% include_example ignore_corrupt_files python/sql/datasource.py %}
</div>
<div data-lang="r" markdown="1">
{% include_example ignore_corrupt_files r/RSparkSQLExample.R %}
</div>
</div>
### Ignore Missing Files
Spark allows you to use `spark.sql.files.ignoreMissingFiles` to ignore missing files while reading data
from files. Here, missing file really means the deleted file under directory after you construct the
`DataFrame`. When set to true, the Spark jobs will continue to run when encountering missing files and
the contents that have been read will still be returned.
### Path Global Filter
`pathGlobFilter` is used to only include files with file names matching the pattern.
The syntax follows <code>org.apache.hadoop.fs.GlobFilter</code>.
It does not change the behavior of partition discovery.
To load files with paths matching a given glob pattern while keeping the behavior of partition discovery,
you can use:
<div class="codetabs">
<div data-lang="scala" markdown="1">
{% include_example load_with_path_glob_filter scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %}
</div>
<div data-lang="java" markdown="1">
{% include_example load_with_path_glob_filter java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %}
</div>
<div data-lang="python" markdown="1">
{% include_example load_with_path_glob_filter python/sql/datasource.py %}
</div>
<div data-lang="r" markdown="1">
{% include_example load_with_path_glob_filter r/RSparkSQLExample.R %}
</div>
</div>
### Recursive File Lookup
`recursiveFileLookup` is used to recursively load files and it disables partition inferring. Its default value is `false`.
If data source explicitly specifies the `partitionSpec` when `recursiveFileLookup` is true, exception will be thrown.
To load all files recursively, you can use:
<div class="codetabs">
<div data-lang="scala" markdown="1">
{% include_example recursive_file_lookup scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %}
</div>
<div data-lang="java" markdown="1">
{% include_example recursive_file_lookup java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %}
</div>
<div data-lang="python" markdown="1">
{% include_example recursive_file_lookup python/sql/datasource.py %}
</div>
<div data-lang="r" markdown="1">
{% include_example recursive_file_lookup r/RSparkSQLExample.R %}
</div>
</div>
### Modification Time Path Filters
`modifiedBefore` and `modifiedAfter` are options that can be
applied together or separately in order to achieve greater
granularity over which files may load during a Spark batch query.
(Note that Structured Streaming file sources don't support these options.)
* `modifiedBefore`: an optional timestamp to only include files with
modification times occurring before the specified time. The provided timestamp
must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
* `modifiedAfter`: an optional timestamp to only include files with
modification times occurring after the specified time. The provided timestamp
must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
When a timezone option is not provided, the timestamps will be interpreted according
to the Spark session timezone (`spark.sql.session.timeZone`).
To load files with paths matching a given modified time range, you can use:
<div class="codetabs">
<div data-lang="scala" markdown="1">
{% include_example load_with_modified_time_filter scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %}
</div>
<div data-lang="java" markdown="1">
{% include_example load_with_modified_time_filter java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %}
</div>
<div data-lang="python" markdown="1">
{% include_example load_with_modified_time_filter python/sql/datasource.py %}
</div>
<div data-lang="r" markdown="1">
{% include_example load_with_modified_time_filter r/RSparkSQLExample.R %}
</div>
</div>