[SPARK-31962][SQL] Provide modifiedAfter and modifiedBefore options when filtering from a batch-based file data source

### What changes were proposed in this pull request?

Two new options, _modifiiedBefore_  and _modifiedAfter_, is provided expecting a value in 'YYYY-MM-DDTHH:mm:ss' format.  _PartioningAwareFileIndex_ considers these options during the process of checking for files, just before considering applied _PathFilters_ such as `pathGlobFilter.`  In order to filter file results, a new PathFilter class was derived for this purpose.  General house-keeping around classes extending PathFilter was performed for neatness.  It became apparent support was needed to handle multiple potential path filters.  Logic was introduced for this purpose and the associated tests written.

### Why are the changes needed?

When loading files from a data source, there can often times be thousands of file within a respective file path.  In many cases I've seen, we want to start loading from a folder path and ideally be able to begin loading files having modification dates past a certain point.  This would mean out of thousands of potential files, only the ones with modification dates greater than the specified timestamp would be considered.  This saves a ton of time automatically and reduces significant complexity managing this in code.

### Does this PR introduce _any_ user-facing change?

This PR introduces an option that can be used with batch-based Spark file data sources.  A documentation update was made to reflect an example and usage of the new data source option.

**Example Usages**
_Load all CSV files modified after date:_
`spark.read.format("csv").option("modifiedAfter","2020-06-15T05:00:00").load()`

_Load all CSV files modified before date:_
`spark.read.format("csv").option("modifiedBefore","2020-06-15T05:00:00").load()`

_Load all CSV files modified between two dates:_
`spark.read.format("csv").option("modifiedAfter","2019-01-15T05:00:00").option("modifiedBefore","2020-06-15T05:00:00").load()
`

### How was this patch tested?

A handful of unit tests were added to support the positive, negative, and edge case code paths.

It's also live in a handful of our Databricks dev environments.  (quoted from cchighman)

Closes #30411 from HeartSaVioR/SPARK-31962.

Lead-authored-by: CC Highman <christopher.highman@microsoft.com>
Co-authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
This commit is contained in:
CC Highman 2020-11-23 08:30:41 +09:00 committed by Jungtaek Lim (HeartSaVioR)
parent d7f4b2ad50
commit d338af3101
14 changed files with 787 additions and 48 deletions

View file

@ -119,3 +119,40 @@ To load all files recursively, you can use:
{% include_example recursive_file_lookup r/RSparkSQLExample.R %} {% include_example recursive_file_lookup r/RSparkSQLExample.R %}
</div> </div>
</div> </div>
### Modification Time Path Filters
`modifiedBefore` and `modifiedAfter` are options that can be
applied together or separately in order to achieve greater
granularity over which files may load during a Spark batch query.
(Note that Structured Streaming file sources don't support these options.)
* `modifiedBefore`: an optional timestamp to only include files with
modification times occurring before the specified time. The provided timestamp
must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
* `modifiedAfter`: an optional timestamp to only include files with
modification times occurring after the specified time. The provided timestamp
must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
When a timezone option is not provided, the timestamps will be interpreted according
to the Spark session timezone (`spark.sql.session.timeZone`).
To load files with paths matching a given modified time range, you can use:
<div class="codetabs">
<div data-lang="scala" markdown="1">
{% include_example load_with_modified_time_filter scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %}
</div>
<div data-lang="java" markdown="1">
{% include_example load_with_modified_time_filter java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %}
</div>
<div data-lang="python" markdown="1">
{% include_example load_with_modified_time_filter python/sql/datasource.py %}
</div>
<div data-lang="r" markdown="1">
{% include_example load_with_modified_time_filter r/RSparkSQLExample.R %}
</div>
</div>

View file

@ -147,6 +147,22 @@ public class JavaSQLDataSourceExample {
// |file1.parquet| // |file1.parquet|
// +-------------+ // +-------------+
// $example off:load_with_path_glob_filter$ // $example off:load_with_path_glob_filter$
// $example on:load_with_modified_time_filter$
Dataset<Row> beforeFilterDF = spark.read().format("parquet")
// Only load files modified before 7/1/2020 at 05:30
.option("modifiedBefore", "2020-07-01T05:30:00")
// Only load files modified after 6/1/2020 at 05:30
.option("modifiedAfter", "2020-06-01T05:30:00")
// Interpret both times above relative to CST timezone
.option("timeZone", "CST")
.load("examples/src/main/resources/dir1");
beforeFilterDF.show();
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// +-------------+
// $example off:load_with_modified_time_filter$
} }
private static void runBasicDataSourceExample(SparkSession spark) { private static void runBasicDataSourceExample(SparkSession spark) {

View file

@ -67,6 +67,26 @@ def generic_file_source_options_example(spark):
# +-------------+ # +-------------+
# $example off:load_with_path_glob_filter$ # $example off:load_with_path_glob_filter$
# $example on:load_with_modified_time_filter$
# Only load files modified before 07/1/2050 @ 08:30:00
df = spark.read.load("examples/src/main/resources/dir1",
format="parquet", modifiedBefore="2050-07-01T08:30:00")
df.show()
# +-------------+
# | file|
# +-------------+
# |file1.parquet|
# +-------------+
# Only load files modified after 06/01/2050 @ 08:30:00
df = spark.read.load("examples/src/main/resources/dir1",
format="parquet", modifiedAfter="2050-06-01T08:30:00")
df.show()
# +-------------+
# | file|
# +-------------+
# +-------------+
# $example off:load_with_modified_time_filter$
def basic_datasource_example(spark): def basic_datasource_example(spark):
# $example on:generic_load_save_functions$ # $example on:generic_load_save_functions$

View file

@ -144,6 +144,14 @@ df <- read.df("examples/src/main/resources/dir1", "parquet", pathGlobFilter = "*
# 1 file1.parquet # 1 file1.parquet
# $example off:load_with_path_glob_filter$ # $example off:load_with_path_glob_filter$
# $example on:load_with_modified_time_filter$
beforeDF <- read.df("examples/src/main/resources/dir1", "parquet", modifiedBefore= "2020-07-01T05:30:00")
# file
# 1 file1.parquet
afterDF <- read.df("examples/src/main/resources/dir1", "parquet", modifiedAfter = "2020-06-01T05:30:00")
# file
# $example off:load_with_modified_time_filter$
# $example on:manual_save_options_orc$ # $example on:manual_save_options_orc$
df <- read.df("examples/src/main/resources/users.orc", "orc") df <- read.df("examples/src/main/resources/users.orc", "orc")
write.orc(df, "users_with_options.orc", orc.bloom.filter.columns = "favorite_color", orc.dictionary.key.threshold = 1.0, orc.column.encoding.direct = "name") write.orc(df, "users_with_options.orc", orc.bloom.filter.columns = "favorite_color", orc.dictionary.key.threshold = 1.0, orc.column.encoding.direct = "name")

View file

@ -81,6 +81,27 @@ object SQLDataSourceExample {
// |file1.parquet| // |file1.parquet|
// +-------------+ // +-------------+
// $example off:load_with_path_glob_filter$ // $example off:load_with_path_glob_filter$
// $example on:load_with_modified_time_filter$
val beforeFilterDF = spark.read.format("parquet")
// Files modified before 07/01/2020 at 05:30 are allowed
.option("modifiedBefore", "2020-07-01T05:30:00")
.load("examples/src/main/resources/dir1");
beforeFilterDF.show();
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// +-------------+
val afterFilterDF = spark.read.format("parquet")
// Files modified after 06/01/2020 at 05:30 are allowed
.option("modifiedAfter", "2020-06-01T05:30:00")
.load("examples/src/main/resources/dir1");
afterFilterDF.show();
// +-------------+
// | file|
// +-------------+
// +-------------+
// $example off:load_with_modified_time_filter$
} }
private def runBasicDataSourceExample(spark: SparkSession): Unit = { private def runBasicDataSourceExample(spark: SparkSession): Unit = {

View file

@ -125,6 +125,12 @@ class DataFrameReader(OptionUtils):
* ``pathGlobFilter``: an optional glob pattern to only include files with paths matching * ``pathGlobFilter``: an optional glob pattern to only include files with paths matching
the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter. the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter.
It does not change the behavior of partition discovery. It does not change the behavior of partition discovery.
* ``modifiedBefore``: an optional timestamp to only include files with
modification times occurring before the specified time. The provided timestamp
must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
* ``modifiedAfter``: an optional timestamp to only include files with
modification times occurring after the specified time. The provided timestamp
must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
""" """
self._jreader = self._jreader.option(key, to_str(value)) self._jreader = self._jreader.option(key, to_str(value))
return self return self
@ -149,6 +155,12 @@ class DataFrameReader(OptionUtils):
* ``pathGlobFilter``: an optional glob pattern to only include files with paths matching * ``pathGlobFilter``: an optional glob pattern to only include files with paths matching
the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter. the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter.
It does not change the behavior of partition discovery. It does not change the behavior of partition discovery.
* ``modifiedBefore``: an optional timestamp to only include files with
modification times occurring before the specified time. The provided timestamp
must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
* ``modifiedAfter``: an optional timestamp to only include files with
modification times occurring after the specified time. The provided timestamp
must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
""" """
for k in options: for k in options:
self._jreader = self._jreader.option(k, to_str(options[k])) self._jreader = self._jreader.option(k, to_str(options[k]))
@ -203,7 +215,8 @@ class DataFrameReader(OptionUtils):
mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None, mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None,
multiLine=None, allowUnquotedControlChars=None, lineSep=None, samplingRatio=None, multiLine=None, allowUnquotedControlChars=None, lineSep=None, samplingRatio=None,
dropFieldIfAllNull=None, encoding=None, locale=None, pathGlobFilter=None, dropFieldIfAllNull=None, encoding=None, locale=None, pathGlobFilter=None,
recursiveFileLookup=None, allowNonNumericNumbers=None): recursiveFileLookup=None, allowNonNumericNumbers=None,
modifiedBefore=None, modifiedAfter=None):
""" """
Loads JSON files and returns the results as a :class:`DataFrame`. Loads JSON files and returns the results as a :class:`DataFrame`.
@ -322,6 +335,13 @@ class DataFrameReader(OptionUtils):
``+Infinity`` and ``Infinity``. ``+Infinity`` and ``Infinity``.
* ``-INF``: for negative infinity, alias ``-Infinity``. * ``-INF``: for negative infinity, alias ``-Infinity``.
* ``NaN``: for other not-a-numbers, like result of division by zero. * ``NaN``: for other not-a-numbers, like result of division by zero.
modifiedBefore : an optional timestamp to only include files with
modification times occurring before the specified time. The provided timestamp
must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
modifiedAfter : an optional timestamp to only include files with
modification times occurring after the specified time. The provided timestamp
must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
Examples Examples
-------- --------
@ -344,6 +364,7 @@ class DataFrameReader(OptionUtils):
allowUnquotedControlChars=allowUnquotedControlChars, lineSep=lineSep, allowUnquotedControlChars=allowUnquotedControlChars, lineSep=lineSep,
samplingRatio=samplingRatio, dropFieldIfAllNull=dropFieldIfAllNull, encoding=encoding, samplingRatio=samplingRatio, dropFieldIfAllNull=dropFieldIfAllNull, encoding=encoding,
locale=locale, pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup, locale=locale, pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup,
modifiedBefore=modifiedBefore, modifiedAfter=modifiedAfter,
allowNonNumericNumbers=allowNonNumericNumbers) allowNonNumericNumbers=allowNonNumericNumbers)
if isinstance(path, str): if isinstance(path, str):
path = [path] path = [path]
@ -410,6 +431,15 @@ class DataFrameReader(OptionUtils):
disables disables
`partition discovery <https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#partition-discovery>`_. # noqa `partition discovery <https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#partition-discovery>`_. # noqa
modification times occurring before the specified time. The provided timestamp
must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
modifiedBefore (batch only) : an optional timestamp to only include files with
modification times occurring before the specified time. The provided timestamp
must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
modifiedAfter (batch only) : an optional timestamp to only include files with
modification times occurring after the specified time. The provided timestamp
must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
Examples Examples
-------- --------
>>> df = spark.read.parquet('python/test_support/sql/parquet_partitioned') >>> df = spark.read.parquet('python/test_support/sql/parquet_partitioned')
@ -418,13 +448,18 @@ class DataFrameReader(OptionUtils):
""" """
mergeSchema = options.get('mergeSchema', None) mergeSchema = options.get('mergeSchema', None)
pathGlobFilter = options.get('pathGlobFilter', None) pathGlobFilter = options.get('pathGlobFilter', None)
modifiedBefore = options.get('modifiedBefore', None)
modifiedAfter = options.get('modifiedAfter', None)
recursiveFileLookup = options.get('recursiveFileLookup', None) recursiveFileLookup = options.get('recursiveFileLookup', None)
self._set_opts(mergeSchema=mergeSchema, pathGlobFilter=pathGlobFilter, self._set_opts(mergeSchema=mergeSchema, pathGlobFilter=pathGlobFilter,
recursiveFileLookup=recursiveFileLookup) recursiveFileLookup=recursiveFileLookup, modifiedBefore=modifiedBefore,
modifiedAfter=modifiedAfter)
return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths))) return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))
def text(self, paths, wholetext=False, lineSep=None, pathGlobFilter=None, def text(self, paths, wholetext=False, lineSep=None, pathGlobFilter=None,
recursiveFileLookup=None): recursiveFileLookup=None, modifiedBefore=None,
modifiedAfter=None):
""" """
Loads text files and returns a :class:`DataFrame` whose schema starts with a Loads text files and returns a :class:`DataFrame` whose schema starts with a
string column named "value", and followed by partitioned columns if there string column named "value", and followed by partitioned columns if there
@ -453,6 +488,15 @@ class DataFrameReader(OptionUtils):
recursively scan a directory for files. Using this option disables recursively scan a directory for files. Using this option disables
`partition discovery <https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#partition-discovery>`_. # noqa `partition discovery <https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#partition-discovery>`_. # noqa
modification times occurring before the specified time. The provided timestamp
must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
modifiedBefore (batch only) : an optional timestamp to only include files with
modification times occurring before the specified time. The provided timestamp
must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
modifiedAfter (batch only) : an optional timestamp to only include files with
modification times occurring after the specified time. The provided timestamp
must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
Examples Examples
-------- --------
>>> df = spark.read.text('python/test_support/sql/text-test.txt') >>> df = spark.read.text('python/test_support/sql/text-test.txt')
@ -464,7 +508,9 @@ class DataFrameReader(OptionUtils):
""" """
self._set_opts( self._set_opts(
wholetext=wholetext, lineSep=lineSep, pathGlobFilter=pathGlobFilter, wholetext=wholetext, lineSep=lineSep, pathGlobFilter=pathGlobFilter,
recursiveFileLookup=recursiveFileLookup) recursiveFileLookup=recursiveFileLookup, modifiedBefore=modifiedBefore,
modifiedAfter=modifiedAfter)
if isinstance(paths, str): if isinstance(paths, str):
paths = [paths] paths = [paths]
return self._df(self._jreader.text(self._spark._sc._jvm.PythonUtils.toSeq(paths))) return self._df(self._jreader.text(self._spark._sc._jvm.PythonUtils.toSeq(paths)))
@ -476,7 +522,7 @@ class DataFrameReader(OptionUtils):
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None, columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None,
samplingRatio=None, enforceSchema=None, emptyValue=None, locale=None, lineSep=None, samplingRatio=None, enforceSchema=None, emptyValue=None, locale=None, lineSep=None,
pathGlobFilter=None, recursiveFileLookup=None): pathGlobFilter=None, recursiveFileLookup=None, modifiedBefore=None, modifiedAfter=None):
r"""Loads a CSV file and returns the result as a :class:`DataFrame`. r"""Loads a CSV file and returns the result as a :class:`DataFrame`.
This function will go through the input once to determine the input schema if This function will go through the input once to determine the input schema if
@ -631,6 +677,15 @@ class DataFrameReader(OptionUtils):
recursively scan a directory for files. Using this option disables recursively scan a directory for files. Using this option disables
`partition discovery <https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#partition-discovery>`_. # noqa `partition discovery <https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#partition-discovery>`_. # noqa
modification times occurring before the specified time. The provided timestamp
must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
modifiedBefore (batch only) : an optional timestamp to only include files with
modification times occurring before the specified time. The provided timestamp
must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
modifiedAfter (batch only) : an optional timestamp to only include files with
modification times occurring after the specified time. The provided timestamp
must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
Examples Examples
-------- --------
>>> df = spark.read.csv('python/test_support/sql/ages.csv') >>> df = spark.read.csv('python/test_support/sql/ages.csv')
@ -652,7 +707,8 @@ class DataFrameReader(OptionUtils):
columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine, columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine,
charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, samplingRatio=samplingRatio, charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, samplingRatio=samplingRatio,
enforceSchema=enforceSchema, emptyValue=emptyValue, locale=locale, lineSep=lineSep, enforceSchema=enforceSchema, emptyValue=emptyValue, locale=locale, lineSep=lineSep,
pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup) pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup,
modifiedBefore=modifiedBefore, modifiedAfter=modifiedAfter)
if isinstance(path, str): if isinstance(path, str):
path = [path] path = [path]
if type(path) == list: if type(path) == list:
@ -679,7 +735,8 @@ class DataFrameReader(OptionUtils):
else: else:
raise TypeError("path can be only string, list or RDD") raise TypeError("path can be only string, list or RDD")
def orc(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=None): def orc(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=None,
modifiedBefore=None, modifiedAfter=None):
"""Loads ORC files, returning the result as a :class:`DataFrame`. """Loads ORC files, returning the result as a :class:`DataFrame`.
.. versionadded:: 1.5.0 .. versionadded:: 1.5.0
@ -701,6 +758,15 @@ class DataFrameReader(OptionUtils):
disables disables
`partition discovery <https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#partition-discovery>`_. # noqa `partition discovery <https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#partition-discovery>`_. # noqa
modification times occurring before the specified time. The provided timestamp
must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
modifiedBefore : an optional timestamp to only include files with
modification times occurring before the specified time. The provided timestamp
must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
modifiedAfter : an optional timestamp to only include files with
modification times occurring after the specified time. The provided timestamp
must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
Examples Examples
-------- --------
>>> df = spark.read.orc('python/test_support/sql/orc_partitioned') >>> df = spark.read.orc('python/test_support/sql/orc_partitioned')
@ -708,6 +774,7 @@ class DataFrameReader(OptionUtils):
[('a', 'bigint'), ('b', 'int'), ('c', 'int')] [('a', 'bigint'), ('b', 'int'), ('c', 'int')]
""" """
self._set_opts(mergeSchema=mergeSchema, pathGlobFilter=pathGlobFilter, self._set_opts(mergeSchema=mergeSchema, pathGlobFilter=pathGlobFilter,
modifiedBefore=modifiedBefore, modifiedAfter=modifiedAfter,
recursiveFileLookup=recursiveFileLookup) recursiveFileLookup=recursiveFileLookup)
if isinstance(path, str): if isinstance(path, str):
path = [path] path = [path]

View file

@ -493,6 +493,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* <li>`pathGlobFilter`: an optional glob pattern to only include files with paths matching * <li>`pathGlobFilter`: an optional glob pattern to only include files with paths matching
* the pattern. The syntax follows <code>org.apache.hadoop.fs.GlobFilter</code>. * the pattern. The syntax follows <code>org.apache.hadoop.fs.GlobFilter</code>.
* It does not change the behavior of partition discovery.</li> * It does not change the behavior of partition discovery.</li>
* <li>`modifiedBefore` (batch only): an optional timestamp to only include files with
* modification times occurring before the specified Time. The provided timestamp
* must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)</li>
* <li>`modifiedAfter` (batch only): an optional timestamp to only include files with
* modification times occurring after the specified Time. The provided timestamp
* must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)</li>
* <li>`recursiveFileLookup`: recursively scan a directory for files. Using this option * <li>`recursiveFileLookup`: recursively scan a directory for files. Using this option
* disables partition discovery</li> * disables partition discovery</li>
* <li>`allowNonNumericNumbers` (default `true`): allows JSON parser to recognize set of * <li>`allowNonNumericNumbers` (default `true`): allows JSON parser to recognize set of
@ -750,6 +756,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* <li>`pathGlobFilter`: an optional glob pattern to only include files with paths matching * <li>`pathGlobFilter`: an optional glob pattern to only include files with paths matching
* the pattern. The syntax follows <code>org.apache.hadoop.fs.GlobFilter</code>. * the pattern. The syntax follows <code>org.apache.hadoop.fs.GlobFilter</code>.
* It does not change the behavior of partition discovery.</li> * It does not change the behavior of partition discovery.</li>
* <li>`modifiedBefore` (batch only): an optional timestamp to only include files with
* modification times occurring before the specified Time. The provided timestamp
* must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)</li>
* <li>`modifiedAfter` (batch only): an optional timestamp to only include files with
* modification times occurring after the specified Time. The provided timestamp
* must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)</li>
* <li>`recursiveFileLookup`: recursively scan a directory for files. Using this option * <li>`recursiveFileLookup`: recursively scan a directory for files. Using this option
* disables partition discovery</li> * disables partition discovery</li>
* </ul> * </ul>
@ -781,6 +793,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* <li>`pathGlobFilter`: an optional glob pattern to only include files with paths matching * <li>`pathGlobFilter`: an optional glob pattern to only include files with paths matching
* the pattern. The syntax follows <code>org.apache.hadoop.fs.GlobFilter</code>. * the pattern. The syntax follows <code>org.apache.hadoop.fs.GlobFilter</code>.
* It does not change the behavior of partition discovery.</li> * It does not change the behavior of partition discovery.</li>
* <li>`modifiedBefore` (batch only): an optional timestamp to only include files with
* modification times occurring before the specified Time. The provided timestamp
* must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)</li>
* <li>`modifiedAfter` (batch only): an optional timestamp to only include files with
* modification times occurring after the specified Time. The provided timestamp
* must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)</li>
* <li>`recursiveFileLookup`: recursively scan a directory for files. Using this option * <li>`recursiveFileLookup`: recursively scan a directory for files. Using this option
* disables partition discovery</li> * disables partition discovery</li>
* </ul> * </ul>
@ -814,6 +832,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* <li>`pathGlobFilter`: an optional glob pattern to only include files with paths matching * <li>`pathGlobFilter`: an optional glob pattern to only include files with paths matching
* the pattern. The syntax follows <code>org.apache.hadoop.fs.GlobFilter</code>. * the pattern. The syntax follows <code>org.apache.hadoop.fs.GlobFilter</code>.
* It does not change the behavior of partition discovery.</li> * It does not change the behavior of partition discovery.</li>
* <li>`modifiedBefore` (batch only): an optional timestamp to only include files with
* modification times occurring before the specified Time. The provided timestamp
* must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)</li>
* <li>`modifiedAfter` (batch only): an optional timestamp to only include files with
* modification times occurring after the specified Time. The provided timestamp
* must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)</li>
* <li>`recursiveFileLookup`: recursively scan a directory for files. Using this option * <li>`recursiveFileLookup`: recursively scan a directory for files. Using this option
* disables partition discovery</li> * disables partition discovery</li>
* </ul> * </ul>
@ -880,6 +904,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* <li>`pathGlobFilter`: an optional glob pattern to only include files with paths matching * <li>`pathGlobFilter`: an optional glob pattern to only include files with paths matching
* the pattern. The syntax follows <code>org.apache.hadoop.fs.GlobFilter</code>. * the pattern. The syntax follows <code>org.apache.hadoop.fs.GlobFilter</code>.
* It does not change the behavior of partition discovery.</li> * It does not change the behavior of partition discovery.</li>
* <li>`modifiedBefore` (batch only): an optional timestamp to only include files with
* modification times occurring before the specified Time. The provided timestamp
* must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)</li>
* <li>`modifiedAfter` (batch only): an optional timestamp to only include files with
* modification times occurring after the specified Time. The provided timestamp
* must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)</li>
* <li>`recursiveFileLookup`: recursively scan a directory for files. Using this option * <li>`recursiveFileLookup`: recursively scan a directory for files. Using this option
* disables partition discovery</li> * disables partition discovery</li>
* </ul> * </ul>

View file

@ -57,13 +57,10 @@ abstract class PartitioningAwareFileIndex(
protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]]
private val caseInsensitiveMap = CaseInsensitiveMap(parameters) private val caseInsensitiveMap = CaseInsensitiveMap(parameters)
private val pathFilters = PathFilterFactory.create(caseInsensitiveMap)
protected lazy val pathGlobFilter: Option[GlobFilter] = protected def matchPathPattern(file: FileStatus): Boolean =
caseInsensitiveMap.get("pathGlobFilter").map(new GlobFilter(_)) pathFilters.forall(_.accept(file))
protected def matchGlobPattern(file: FileStatus): Boolean = {
pathGlobFilter.forall(_.accept(file.getPath))
}
protected lazy val recursiveFileLookup: Boolean = { protected lazy val recursiveFileLookup: Boolean = {
caseInsensitiveMap.getOrElse("recursiveFileLookup", "false").toBoolean caseInsensitiveMap.getOrElse("recursiveFileLookup", "false").toBoolean
@ -86,7 +83,7 @@ abstract class PartitioningAwareFileIndex(
val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match { val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match {
case Some(existingDir) => case Some(existingDir) =>
// Directory has children files in it, return them // Directory has children files in it, return them
existingDir.filter(f => matchGlobPattern(f) && isNonEmptyFile(f)) existingDir.filter(f => matchPathPattern(f) && isNonEmptyFile(f))
case None => case None =>
// Directory does not exist, or has no children files // Directory does not exist, or has no children files
@ -135,7 +132,7 @@ abstract class PartitioningAwareFileIndex(
} else { } else {
leafFiles.values.toSeq leafFiles.values.toSeq
} }
files.filter(matchGlobPattern) files.filter(matchPathPattern)
} }
protected def inferPartitioning(): PartitionSpec = { protected def inferPartitioning(): PartitionSpec = {

View file

@ -0,0 +1,161 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources
import java.util.{Locale, TimeZone}
import org.apache.hadoop.fs.{FileStatus, GlobFilter}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.unsafe.types.UTF8String
trait PathFilterStrategy extends Serializable {
def accept(fileStatus: FileStatus): Boolean
}
trait StrategyBuilder {
def create(parameters: CaseInsensitiveMap[String]): Option[PathFilterStrategy]
}
class PathGlobFilter(filePatten: String) extends PathFilterStrategy {
private val globFilter = new GlobFilter(filePatten)
override def accept(fileStatus: FileStatus): Boolean =
globFilter.accept(fileStatus.getPath)
}
object PathGlobFilter extends StrategyBuilder {
val PARAM_NAME = "pathglobfilter"
override def create(parameters: CaseInsensitiveMap[String]): Option[PathFilterStrategy] = {
parameters.get(PARAM_NAME).map(new PathGlobFilter(_))
}
}
/**
* Provide modifiedAfter and modifiedBefore options when
* filtering from a batch-based file data source.
*
* Example Usages
* Load all CSV files modified after date:
* {{{
* spark.read.format("csv").option("modifiedAfter","2020-06-15T05:00:00").load()
* }}}
*
* Load all CSV files modified before date:
* {{{
* spark.read.format("csv").option("modifiedBefore","2020-06-15T05:00:00").load()
* }}}
*
* Load all CSV files modified between two dates:
* {{{
* spark.read.format("csv").option("modifiedAfter","2019-01-15T05:00:00")
* .option("modifiedBefore","2020-06-15T05:00:00").load()
* }}}
*/
abstract class ModifiedDateFilter extends PathFilterStrategy {
def timeZoneId: String
protected def localTime(micros: Long): Long =
DateTimeUtils.fromUTCTime(micros, timeZoneId)
}
object ModifiedDateFilter {
def getTimeZoneId(options: CaseInsensitiveMap[String]): String = {
options.getOrElse(
DateTimeUtils.TIMEZONE_OPTION.toLowerCase(Locale.ROOT),
SQLConf.get.sessionLocalTimeZone)
}
def toThreshold(timeString: String, timeZoneId: String, strategy: String): Long = {
val timeZone: TimeZone = DateTimeUtils.getTimeZone(timeZoneId)
val ts = UTF8String.fromString(timeString)
DateTimeUtils.stringToTimestamp(ts, timeZone.toZoneId).getOrElse {
throw new AnalysisException(
s"The timestamp provided for the '$strategy' option is invalid. The expected format " +
s"is 'YYYY-MM-DDTHH:mm:ss', but the provided timestamp: $timeString")
}
}
}
/**
* Filter used to determine whether file was modified before the provided timestamp.
*/
class ModifiedBeforeFilter(thresholdTime: Long, val timeZoneId: String)
extends ModifiedDateFilter {
override def accept(fileStatus: FileStatus): Boolean =
// We standardize on microseconds wherever possible
// getModificationTime returns in milliseconds
thresholdTime - localTime(DateTimeUtils.millisToMicros(fileStatus.getModificationTime)) > 0
}
object ModifiedBeforeFilter extends StrategyBuilder {
import ModifiedDateFilter._
val PARAM_NAME = "modifiedbefore"
override def create(parameters: CaseInsensitiveMap[String]): Option[PathFilterStrategy] = {
parameters.get(PARAM_NAME).map { value =>
val timeZoneId = getTimeZoneId(parameters)
val thresholdTime = toThreshold(value, timeZoneId, PARAM_NAME)
new ModifiedBeforeFilter(thresholdTime, timeZoneId)
}
}
}
/**
* Filter used to determine whether file was modified after the provided timestamp.
*/
class ModifiedAfterFilter(thresholdTime: Long, val timeZoneId: String)
extends ModifiedDateFilter {
override def accept(fileStatus: FileStatus): Boolean =
// getModificationTime returns in milliseconds
// We standardize on microseconds wherever possible
localTime(DateTimeUtils.millisToMicros(fileStatus.getModificationTime)) - thresholdTime > 0
}
object ModifiedAfterFilter extends StrategyBuilder {
import ModifiedDateFilter._
val PARAM_NAME = "modifiedafter"
override def create(parameters: CaseInsensitiveMap[String]): Option[PathFilterStrategy] = {
parameters.get(PARAM_NAME).map { value =>
val timeZoneId = getTimeZoneId(parameters)
val thresholdTime = toThreshold(value, timeZoneId, PARAM_NAME)
new ModifiedAfterFilter(thresholdTime, timeZoneId)
}
}
}
object PathFilterFactory {
private val strategies =
Seq(PathGlobFilter, ModifiedBeforeFilter, ModifiedAfterFilter)
def create(parameters: CaseInsensitiveMap[String]): Seq[PathFilterStrategy] = {
strategies.flatMap { _.create(parameters) }
}
}

View file

@ -23,6 +23,7 @@ import scala.util.Try
import org.apache.spark.internal.Logging import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.datasources.{ModifiedAfterFilter, ModifiedBeforeFilter}
import org.apache.spark.util.Utils import org.apache.spark.util.Utils
/** /**
@ -32,6 +33,16 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging
def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters)) def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters))
checkDisallowedOptions(parameters)
private def checkDisallowedOptions(options: Map[String, String]): Unit = {
Seq(ModifiedBeforeFilter.PARAM_NAME, ModifiedAfterFilter.PARAM_NAME).foreach { param =>
if (parameters.contains(param)) {
throw new IllegalArgumentException(s"option '$param' is not allowed in file stream sources")
}
}
}
val maxFilesPerTrigger: Option[Int] = parameters.get("maxFilesPerTrigger").map { str => val maxFilesPerTrigger: Option[Int] = parameters.get("maxFilesPerTrigger").map { str =>
Try(str.toInt).toOption.filter(_ > 0).getOrElse { Try(str.toInt).toOption.filter(_ > 0).getOrElse {
throw new IllegalArgumentException( throw new IllegalArgumentException(

View file

@ -577,38 +577,6 @@ class FileBasedDataSourceSuite extends QueryTest
} }
} }
test("Option pathGlobFilter: filter files correctly") {
withTempPath { path =>
val dataDir = path.getCanonicalPath
Seq("foo").toDS().write.text(dataDir)
Seq("bar").toDS().write.mode("append").orc(dataDir)
val df = spark.read.option("pathGlobFilter", "*.txt").text(dataDir)
checkAnswer(df, Row("foo"))
// Both glob pattern in option and path should be effective to filter files.
val df2 = spark.read.option("pathGlobFilter", "*.txt").text(dataDir + "/*.orc")
checkAnswer(df2, Seq.empty)
val df3 = spark.read.option("pathGlobFilter", "*.txt").text(dataDir + "/*xt")
checkAnswer(df3, Row("foo"))
}
}
test("Option pathGlobFilter: simple extension filtering should contains partition info") {
withTempPath { path =>
val input = Seq(("foo", 1), ("oof", 2)).toDF("a", "b")
input.write.partitionBy("b").text(path.getCanonicalPath)
Seq("bar").toDS().write.mode("append").orc(path.getCanonicalPath + "/b=1")
// If we use glob pattern in the path, the partition column won't be shown in the result.
val df = spark.read.text(path.getCanonicalPath + "/*/*.txt")
checkAnswer(df, input.select("a"))
val df2 = spark.read.option("pathGlobFilter", "*.txt").text(path.getCanonicalPath)
checkAnswer(df2, input)
}
}
test("Option recursiveFileLookup: recursive loading correctly") { test("Option recursiveFileLookup: recursive loading correctly") {
val expectedFileList = mutable.ListBuffer[String]() val expectedFileList = mutable.ListBuffer[String]()

View file

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.test.SharedSparkSession
class PathFilterStrategySuite extends QueryTest with SharedSparkSession {
test("SPARK-31962: PathFilterStrategies - modifiedAfter option") {
val options =
CaseInsensitiveMap[String](Map("modifiedAfter" -> "2010-10-01T01:01:00"))
val strategy = PathFilterFactory.create(options)
assert(strategy.head.isInstanceOf[ModifiedAfterFilter])
assert(strategy.size == 1)
}
test("SPARK-31962: PathFilterStrategies - modifiedBefore option") {
val options =
CaseInsensitiveMap[String](Map("modifiedBefore" -> "2020-10-01T01:01:00"))
val strategy = PathFilterFactory.create(options)
assert(strategy.head.isInstanceOf[ModifiedBeforeFilter])
assert(strategy.size == 1)
}
test("SPARK-31962: PathFilterStrategies - pathGlobFilter option") {
val options = CaseInsensitiveMap[String](Map("pathGlobFilter" -> "*.txt"))
val strategy = PathFilterFactory.create(options)
assert(strategy.head.isInstanceOf[PathGlobFilter])
assert(strategy.size == 1)
}
test("SPARK-31962: PathFilterStrategies - no options") {
val options = CaseInsensitiveMap[String](Map.empty)
val strategy = PathFilterFactory.create(options)
assert(strategy.isEmpty)
}
}

View file

@ -0,0 +1,307 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources
import java.io.File
import java.time.{LocalDateTime, ZoneId, ZoneOffset}
import java.time.format.DateTimeFormatter
import scala.util.Random
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalyst.util.{stringToFile, DateTimeUtils}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{StringType, StructField, StructType}
class PathFilterSuite extends QueryTest with SharedSparkSession {
import testImplicits._
test("SPARK-31962: modifiedBefore specified" +
" and sharing same timestamp with file last modified time.") {
withTempDir { dir =>
val curTime = LocalDateTime.now(ZoneOffset.UTC)
executeTest(dir, Seq(curTime), 0, modifiedBefore = Some(formatTime(curTime)))
}
}
test("SPARK-31962: modifiedAfter specified" +
" and sharing same timestamp with file last modified time.") {
withTempDir { dir =>
val curTime = LocalDateTime.now(ZoneOffset.UTC)
executeTest(dir, Seq(curTime), 0, modifiedAfter = Some(formatTime(curTime)))
}
}
test("SPARK-31962: modifiedBefore and modifiedAfter option" +
" share same timestamp with file last modified time.") {
withTempDir { dir =>
val curTime = LocalDateTime.now(ZoneOffset.UTC)
val formattedTime = formatTime(curTime)
executeTest(dir, Seq(curTime), 0, modifiedBefore = Some(formattedTime),
modifiedAfter = Some(formattedTime))
}
}
test("SPARK-31962: modifiedBefore and modifiedAfter option" +
" share same timestamp with earlier file last modified time.") {
withTempDir { dir =>
val curTime = LocalDateTime.now(ZoneOffset.UTC)
val fileTime = curTime.minusDays(3)
val formattedTime = formatTime(curTime)
executeTest(dir, Seq(fileTime), 0, modifiedBefore = Some(formattedTime),
modifiedAfter = Some(formattedTime))
}
}
test("SPARK-31962: modifiedBefore and modifiedAfter option" +
" share same timestamp with later file last modified time.") {
withTempDir { dir =>
val curTime = LocalDateTime.now(ZoneOffset.UTC)
val formattedTime = formatTime(curTime)
executeTest(dir, Seq(curTime), 0, modifiedBefore = Some(formattedTime),
modifiedAfter = Some(formattedTime))
}
}
test("SPARK-31962: when modifiedAfter specified with a past date") {
withTempDir { dir =>
val curTime = LocalDateTime.now(ZoneOffset.UTC)
val pastTime = curTime.minusYears(1)
val formattedTime = formatTime(pastTime)
executeTest(dir, Seq(curTime), 1, modifiedAfter = Some(formattedTime))
}
}
test("SPARK-31962: when modifiedBefore specified with a future date") {
withTempDir { dir =>
val curTime = LocalDateTime.now(ZoneOffset.UTC)
val futureTime = curTime.plusYears(1)
val formattedTime = formatTime(futureTime)
executeTest(dir, Seq(curTime), 1, modifiedBefore = Some(formattedTime))
}
}
test("SPARK-31962: with modifiedBefore option provided using a past date") {
withTempDir { dir =>
val curTime = LocalDateTime.now(ZoneOffset.UTC)
val pastTime = curTime.minusYears(1)
val formattedTime = formatTime(pastTime)
executeTest(dir, Seq(curTime), 0, modifiedBefore = Some(formattedTime))
}
}
test("SPARK-31962: modifiedAfter specified with a past date, multiple files, one valid") {
withTempDir { dir =>
val fileTime1 = LocalDateTime.now(ZoneOffset.UTC)
val fileTime2 = LocalDateTime.ofEpochSecond(0, 0, ZoneOffset.UTC)
val pastTime = fileTime1.minusYears(1)
val formattedTime = formatTime(pastTime)
executeTest(dir, Seq(fileTime1, fileTime2), 1, modifiedAfter = Some(formattedTime))
}
}
test("SPARK-31962: modifiedAfter specified with a past date, multiple files, both valid") {
withTempDir { dir =>
val curTime = LocalDateTime.now(ZoneOffset.UTC)
val pastTime = curTime.minusYears(1)
val formattedTime = formatTime(pastTime)
executeTest(dir, Seq(curTime, curTime), 2, modifiedAfter = Some(formattedTime))
}
}
test("SPARK-31962: modifiedAfter specified with a past date, multiple files, none valid") {
withTempDir { dir =>
val fileTime = LocalDateTime.ofEpochSecond(0, 0, ZoneOffset.UTC)
val pastTime = LocalDateTime.now(ZoneOffset.UTC).minusYears(1)
val formattedTime = formatTime(pastTime)
executeTest(dir, Seq(fileTime, fileTime), 0, modifiedAfter = Some(formattedTime))
}
}
test("SPARK-31962: modifiedBefore specified with a future date, multiple files, both valid") {
withTempDir { dir =>
val fileTime = LocalDateTime.ofEpochSecond(0, 0, ZoneOffset.UTC)
val futureTime = LocalDateTime.now(ZoneOffset.UTC).plusYears(1)
val formattedTime = formatTime(futureTime)
executeTest(dir, Seq(fileTime, fileTime), 2, modifiedBefore = Some(formattedTime))
}
}
test("SPARK-31962: modifiedBefore specified with a future date, multiple files, one valid") {
withTempDir { dir =>
val curTime = LocalDateTime.now(ZoneOffset.UTC)
val fileTime1 = LocalDateTime.ofEpochSecond(0, 0, ZoneOffset.UTC)
val fileTime2 = curTime.plusDays(3)
val formattedTime = formatTime(curTime)
executeTest(dir, Seq(fileTime1, fileTime2), 1, modifiedBefore = Some(formattedTime))
}
}
test("SPARK-31962: modifiedBefore specified with a future date, multiple files, none valid") {
withTempDir { dir =>
val fileTime = LocalDateTime.now(ZoneOffset.UTC).minusDays(1)
val formattedTime = formatTime(fileTime)
executeTest(dir, Seq(fileTime, fileTime), 0, modifiedBefore = Some(formattedTime))
}
}
test("SPARK-31962: modifiedBefore/modifiedAfter is specified with an invalid date") {
executeTestWithBadOption(
Map("modifiedBefore" -> "2024-05+1 01:00:00"),
Seq("The timestamp provided", "modifiedbefore", "2024-05+1 01:00:00"))
executeTestWithBadOption(
Map("modifiedAfter" -> "2024-05+1 01:00:00"),
Seq("The timestamp provided", "modifiedafter", "2024-05+1 01:00:00"))
}
test("SPARK-31962: modifiedBefore/modifiedAfter - empty option") {
executeTestWithBadOption(
Map("modifiedBefore" -> ""),
Seq("The timestamp provided", "modifiedbefore"))
executeTestWithBadOption(
Map("modifiedAfter" -> ""),
Seq("The timestamp provided", "modifiedafter"))
}
test("SPARK-31962: modifiedBefore/modifiedAfter filter takes into account local timezone " +
"when specified as an option.") {
Seq("modifiedbefore", "modifiedafter").foreach { filterName =>
// CET = UTC + 1 hour, HST = UTC - 10 hours
Seq("CET", "HST").foreach { tzId =>
testModifiedDateFilterWithTimezone(tzId, filterName)
}
}
}
test("Option pathGlobFilter: filter files correctly") {
withTempPath { path =>
val dataDir = path.getCanonicalPath
Seq("foo").toDS().write.text(dataDir)
Seq("bar").toDS().write.mode("append").orc(dataDir)
val df = spark.read.option("pathGlobFilter", "*.txt").text(dataDir)
checkAnswer(df, Row("foo"))
// Both glob pattern in option and path should be effective to filter files.
val df2 = spark.read.option("pathGlobFilter", "*.txt").text(dataDir + "/*.orc")
checkAnswer(df2, Seq.empty)
val df3 = spark.read.option("pathGlobFilter", "*.txt").text(dataDir + "/*xt")
checkAnswer(df3, Row("foo"))
}
}
test("Option pathGlobFilter: simple extension filtering should contains partition info") {
withTempPath { path =>
val input = Seq(("foo", 1), ("oof", 2)).toDF("a", "b")
input.write.partitionBy("b").text(path.getCanonicalPath)
Seq("bar").toDS().write.mode("append").orc(path.getCanonicalPath + "/b=1")
// If we use glob pattern in the path, the partition column won't be shown in the result.
val df = spark.read.text(path.getCanonicalPath + "/*/*.txt")
checkAnswer(df, input.select("a"))
val df2 = spark.read.option("pathGlobFilter", "*.txt").text(path.getCanonicalPath)
checkAnswer(df2, input)
}
}
private def executeTest(
dir: File,
fileDates: Seq[LocalDateTime],
expectedCount: Long,
modifiedBefore: Option[String] = None,
modifiedAfter: Option[String] = None): Unit = {
fileDates.foreach { fileDate =>
val file = createSingleFile(dir)
setFileTime(fileDate, file)
}
val schema = StructType(Seq(StructField("a", StringType)))
var dfReader = spark.read.format("csv").option("timeZone", "UTC").schema(schema)
modifiedBefore.foreach { opt => dfReader = dfReader.option("modifiedBefore", opt) }
modifiedAfter.foreach { opt => dfReader = dfReader.option("modifiedAfter", opt) }
if (expectedCount > 0) {
// without pathGlobFilter
val df1 = dfReader.load(dir.getCanonicalPath)
assert(df1.count() === expectedCount)
// pathGlobFilter matched
val df2 = dfReader.option("pathGlobFilter", "*.csv").load(dir.getCanonicalPath)
assert(df2.count() === expectedCount)
// pathGlobFilter mismatched
val df3 = dfReader.option("pathGlobFilter", "*.txt").load(dir.getCanonicalPath)
assert(df3.count() === 0)
} else {
val df = dfReader.load(dir.getCanonicalPath)
assert(df.count() === 0)
}
}
private def executeTestWithBadOption(
options: Map[String, String],
expectedMsgParts: Seq[String]): Unit = {
withTempDir { dir =>
createSingleFile(dir)
val exc = intercept[AnalysisException] {
var dfReader = spark.read.format("csv")
options.foreach { case (key, value) =>
dfReader = dfReader.option(key, value)
}
dfReader.load(dir.getCanonicalPath)
}
expectedMsgParts.foreach { msg => assert(exc.getMessage.contains(msg)) }
}
}
private def testModifiedDateFilterWithTimezone(
timezoneId: String,
filterParamName: String): Unit = {
val curTime = LocalDateTime.now(ZoneOffset.UTC)
val zoneId: ZoneId = DateTimeUtils.getTimeZone(timezoneId).toZoneId
val strategyTimeInMicros =
ModifiedDateFilter.toThreshold(
curTime.toString,
timezoneId,
filterParamName)
val strategyTimeInSeconds = strategyTimeInMicros / 1000 / 1000
val curTimeAsSeconds = curTime.atZone(zoneId).toEpochSecond
withClue(s"timezone: $timezoneId / param: $filterParamName,") {
assert(strategyTimeInSeconds === curTimeAsSeconds)
}
}
private def createSingleFile(dir: File): File = {
val file = new File(dir, "temp" + Random.nextInt(1000000) + ".csv")
stringToFile(file, "text")
}
private def setFileTime(time: LocalDateTime, file: File): Boolean = {
val sameTime = time.toEpochSecond(ZoneOffset.UTC)
file.setLastModified(sameTime * 1000)
}
private def formatTime(time: LocalDateTime): String = {
time.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"))
}
}

View file

@ -19,6 +19,8 @@ package org.apache.spark.sql.streaming
import java.io.File import java.io.File
import java.net.URI import java.net.URI
import java.time.{LocalDateTime, ZoneOffset}
import java.time.format.DateTimeFormatter
import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicLong
import scala.collection.mutable import scala.collection.mutable
@ -40,7 +42,6 @@ import org.apache.spark.sql.execution.streaming.sources.MemorySink
import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.util.StreamManualClock import org.apache.spark.sql.streaming.util.StreamManualClock
import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.{StructType, _} import org.apache.spark.sql.types.{StructType, _}
import org.apache.spark.util.Utils import org.apache.spark.util.Utils
@ -2054,6 +2055,47 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
} }
} }
test("SPARK-31962: file stream source shouldn't allow modifiedBefore/modifiedAfter") {
def formatTime(time: LocalDateTime): String = {
time.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"))
}
def assertOptionIsNotSupported(options: Map[String, String], path: String): Unit = {
val schema = StructType(Seq(StructField("a", StringType)))
var dsReader = spark.readStream
.format("csv")
.option("timeZone", "UTC")
.schema(schema)
options.foreach { case (k, v) => dsReader = dsReader.option(k, v) }
val df = dsReader.load(path)
testStream(df)(
ExpectFailure[IllegalArgumentException](
t => assert(t.getMessage.contains("is not allowed in file stream source")),
isFatalError = false)
)
}
withTempDir { dir =>
// "modifiedBefore"
val futureTime = LocalDateTime.now(ZoneOffset.UTC).plusYears(1)
val formattedFutureTime = formatTime(futureTime)
assertOptionIsNotSupported(Map("modifiedBefore" -> formattedFutureTime), dir.getCanonicalPath)
// "modifiedAfter"
val prevTime = LocalDateTime.now(ZoneOffset.UTC).minusYears(1)
val formattedPrevTime = formatTime(prevTime)
assertOptionIsNotSupported(Map("modifiedAfter" -> formattedPrevTime), dir.getCanonicalPath)
// both
assertOptionIsNotSupported(
Map("modifiedBefore" -> formattedFutureTime, "modifiedAfter" -> formattedPrevTime),
dir.getCanonicalPath)
}
}
private def createFile(content: String, src: File, tmp: File): File = { private def createFile(content: String, src: File, tmp: File): File = {
val tempFile = Utils.tempFileWith(new File(tmp, "text")) val tempFile = Utils.tempFileWith(new File(tmp, "text"))
val finalFile = new File(src, tempFile.getName) val finalFile = new File(src, tempFile.getName)