diff --git a/docs/sql-data-sources-generic-options.md b/docs/sql-data-sources-generic-options.md
index 6bcf48235b..2e4fc879a4 100644
--- a/docs/sql-data-sources-generic-options.md
+++ b/docs/sql-data-sources-generic-options.md
@@ -119,3 +119,40 @@ To load all files recursively, you can use:
{% include_example recursive_file_lookup r/RSparkSQLExample.R %}
+
+### Modification Time Path Filters
+
+`modifiedBefore` and `modifiedAfter` are options that can be
+applied together or separately in order to achieve greater
+granularity over which files may load during a Spark batch query.
+(Note that Structured Streaming file sources don't support these options.)
+
+* `modifiedBefore`: an optional timestamp to only include files with
+modification times occurring before the specified time. The provided timestamp
+must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
+* `modifiedAfter`: an optional timestamp to only include files with
+modification times occurring after the specified time. The provided timestamp
+must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
+
+When a timezone option is not provided, the timestamps will be interpreted according
+to the Spark session timezone (`spark.sql.session.timeZone`).
+
+To load files with paths matching a given modified time range, you can use:
+
+
+
+{% include_example load_with_modified_time_filter scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %}
+
+
+
+{% include_example load_with_modified_time_filter java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %}
+
+
+
+{% include_example load_with_modified_time_filter python/sql/datasource.py %}
+
+
+
+{% include_example load_with_modified_time_filter r/RSparkSQLExample.R %}
+
+
\ No newline at end of file
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
index 2295225387..46e740d78b 100644
--- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
@@ -147,6 +147,22 @@ public class JavaSQLDataSourceExample {
// |file1.parquet|
// +-------------+
// $example off:load_with_path_glob_filter$
+ // $example on:load_with_modified_time_filter$
+ Dataset beforeFilterDF = spark.read().format("parquet")
+ // Only load files modified before 7/1/2020 at 05:30
+ .option("modifiedBefore", "2020-07-01T05:30:00")
+ // Only load files modified after 6/1/2020 at 05:30
+ .option("modifiedAfter", "2020-06-01T05:30:00")
+ // Interpret both times above relative to CST timezone
+ .option("timeZone", "CST")
+ .load("examples/src/main/resources/dir1");
+ beforeFilterDF.show();
+ // +-------------+
+ // | file|
+ // +-------------+
+ // |file1.parquet|
+ // +-------------+
+ // $example off:load_with_modified_time_filter$
}
private static void runBasicDataSourceExample(SparkSession spark) {
diff --git a/examples/src/main/python/sql/datasource.py b/examples/src/main/python/sql/datasource.py
index eecd8c2d84..8c146ba0c9 100644
--- a/examples/src/main/python/sql/datasource.py
+++ b/examples/src/main/python/sql/datasource.py
@@ -67,6 +67,26 @@ def generic_file_source_options_example(spark):
# +-------------+
# $example off:load_with_path_glob_filter$
+ # $example on:load_with_modified_time_filter$
+ # Only load files modified before 07/1/2050 @ 08:30:00
+ df = spark.read.load("examples/src/main/resources/dir1",
+ format="parquet", modifiedBefore="2050-07-01T08:30:00")
+ df.show()
+ # +-------------+
+ # | file|
+ # +-------------+
+ # |file1.parquet|
+ # +-------------+
+ # Only load files modified after 06/01/2050 @ 08:30:00
+ df = spark.read.load("examples/src/main/resources/dir1",
+ format="parquet", modifiedAfter="2050-06-01T08:30:00")
+ df.show()
+ # +-------------+
+ # | file|
+ # +-------------+
+ # +-------------+
+ # $example off:load_with_modified_time_filter$
+
def basic_datasource_example(spark):
# $example on:generic_load_save_functions$
diff --git a/examples/src/main/r/RSparkSQLExample.R b/examples/src/main/r/RSparkSQLExample.R
index 8685cfb5c0..86ad533424 100644
--- a/examples/src/main/r/RSparkSQLExample.R
+++ b/examples/src/main/r/RSparkSQLExample.R
@@ -144,6 +144,14 @@ df <- read.df("examples/src/main/resources/dir1", "parquet", pathGlobFilter = "*
# 1 file1.parquet
# $example off:load_with_path_glob_filter$
+# $example on:load_with_modified_time_filter$
+beforeDF <- read.df("examples/src/main/resources/dir1", "parquet", modifiedBefore= "2020-07-01T05:30:00")
+# file
+# 1 file1.parquet
+afterDF <- read.df("examples/src/main/resources/dir1", "parquet", modifiedAfter = "2020-06-01T05:30:00")
+# file
+# $example off:load_with_modified_time_filter$
+
# $example on:manual_save_options_orc$
df <- read.df("examples/src/main/resources/users.orc", "orc")
write.orc(df, "users_with_options.orc", orc.bloom.filter.columns = "favorite_color", orc.dictionary.key.threshold = 1.0, orc.column.encoding.direct = "name")
diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala b/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala
index 2c7abfcd33..90c0eeb5ba 100644
--- a/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala
@@ -81,6 +81,27 @@ object SQLDataSourceExample {
// |file1.parquet|
// +-------------+
// $example off:load_with_path_glob_filter$
+ // $example on:load_with_modified_time_filter$
+ val beforeFilterDF = spark.read.format("parquet")
+ // Files modified before 07/01/2020 at 05:30 are allowed
+ .option("modifiedBefore", "2020-07-01T05:30:00")
+ .load("examples/src/main/resources/dir1");
+ beforeFilterDF.show();
+ // +-------------+
+ // | file|
+ // +-------------+
+ // |file1.parquet|
+ // +-------------+
+ val afterFilterDF = spark.read.format("parquet")
+ // Files modified after 06/01/2020 at 05:30 are allowed
+ .option("modifiedAfter", "2020-06-01T05:30:00")
+ .load("examples/src/main/resources/dir1");
+ afterFilterDF.show();
+ // +-------------+
+ // | file|
+ // +-------------+
+ // +-------------+
+ // $example off:load_with_modified_time_filter$
}
private def runBasicDataSourceExample(spark: SparkSession): Unit = {
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 2ed991c87f..bb31e6a3e0 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -125,6 +125,12 @@ class DataFrameReader(OptionUtils):
* ``pathGlobFilter``: an optional glob pattern to only include files with paths matching
the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter.
It does not change the behavior of partition discovery.
+ * ``modifiedBefore``: an optional timestamp to only include files with
+ modification times occurring before the specified time. The provided timestamp
+ must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
+ * ``modifiedAfter``: an optional timestamp to only include files with
+ modification times occurring after the specified time. The provided timestamp
+ must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
"""
self._jreader = self._jreader.option(key, to_str(value))
return self
@@ -149,6 +155,12 @@ class DataFrameReader(OptionUtils):
* ``pathGlobFilter``: an optional glob pattern to only include files with paths matching
the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter.
It does not change the behavior of partition discovery.
+ * ``modifiedBefore``: an optional timestamp to only include files with
+ modification times occurring before the specified time. The provided timestamp
+ must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
+ * ``modifiedAfter``: an optional timestamp to only include files with
+ modification times occurring after the specified time. The provided timestamp
+ must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
"""
for k in options:
self._jreader = self._jreader.option(k, to_str(options[k]))
@@ -203,7 +215,8 @@ class DataFrameReader(OptionUtils):
mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None,
multiLine=None, allowUnquotedControlChars=None, lineSep=None, samplingRatio=None,
dropFieldIfAllNull=None, encoding=None, locale=None, pathGlobFilter=None,
- recursiveFileLookup=None, allowNonNumericNumbers=None):
+ recursiveFileLookup=None, allowNonNumericNumbers=None,
+ modifiedBefore=None, modifiedAfter=None):
"""
Loads JSON files and returns the results as a :class:`DataFrame`.
@@ -322,6 +335,13 @@ class DataFrameReader(OptionUtils):
``+Infinity`` and ``Infinity``.
* ``-INF``: for negative infinity, alias ``-Infinity``.
* ``NaN``: for other not-a-numbers, like result of division by zero.
+ modifiedBefore : an optional timestamp to only include files with
+ modification times occurring before the specified time. The provided timestamp
+ must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
+ modifiedAfter : an optional timestamp to only include files with
+ modification times occurring after the specified time. The provided timestamp
+ must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
+
Examples
--------
@@ -344,6 +364,7 @@ class DataFrameReader(OptionUtils):
allowUnquotedControlChars=allowUnquotedControlChars, lineSep=lineSep,
samplingRatio=samplingRatio, dropFieldIfAllNull=dropFieldIfAllNull, encoding=encoding,
locale=locale, pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup,
+ modifiedBefore=modifiedBefore, modifiedAfter=modifiedAfter,
allowNonNumericNumbers=allowNonNumericNumbers)
if isinstance(path, str):
path = [path]
@@ -410,6 +431,15 @@ class DataFrameReader(OptionUtils):
disables
`partition discovery `_. # noqa
+ modification times occurring before the specified time. The provided timestamp
+ must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
+ modifiedBefore (batch only) : an optional timestamp to only include files with
+ modification times occurring before the specified time. The provided timestamp
+ must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
+ modifiedAfter (batch only) : an optional timestamp to only include files with
+ modification times occurring after the specified time. The provided timestamp
+ must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
+
Examples
--------
>>> df = spark.read.parquet('python/test_support/sql/parquet_partitioned')
@@ -418,13 +448,18 @@ class DataFrameReader(OptionUtils):
"""
mergeSchema = options.get('mergeSchema', None)
pathGlobFilter = options.get('pathGlobFilter', None)
+ modifiedBefore = options.get('modifiedBefore', None)
+ modifiedAfter = options.get('modifiedAfter', None)
recursiveFileLookup = options.get('recursiveFileLookup', None)
self._set_opts(mergeSchema=mergeSchema, pathGlobFilter=pathGlobFilter,
- recursiveFileLookup=recursiveFileLookup)
+ recursiveFileLookup=recursiveFileLookup, modifiedBefore=modifiedBefore,
+ modifiedAfter=modifiedAfter)
+
return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))
def text(self, paths, wholetext=False, lineSep=None, pathGlobFilter=None,
- recursiveFileLookup=None):
+ recursiveFileLookup=None, modifiedBefore=None,
+ modifiedAfter=None):
"""
Loads text files and returns a :class:`DataFrame` whose schema starts with a
string column named "value", and followed by partitioned columns if there
@@ -453,6 +488,15 @@ class DataFrameReader(OptionUtils):
recursively scan a directory for files. Using this option disables
`partition discovery `_. # noqa
+ modification times occurring before the specified time. The provided timestamp
+ must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
+ modifiedBefore (batch only) : an optional timestamp to only include files with
+ modification times occurring before the specified time. The provided timestamp
+ must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
+ modifiedAfter (batch only) : an optional timestamp to only include files with
+ modification times occurring after the specified time. The provided timestamp
+ must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
+
Examples
--------
>>> df = spark.read.text('python/test_support/sql/text-test.txt')
@@ -464,7 +508,9 @@ class DataFrameReader(OptionUtils):
"""
self._set_opts(
wholetext=wholetext, lineSep=lineSep, pathGlobFilter=pathGlobFilter,
- recursiveFileLookup=recursiveFileLookup)
+ recursiveFileLookup=recursiveFileLookup, modifiedBefore=modifiedBefore,
+ modifiedAfter=modifiedAfter)
+
if isinstance(paths, str):
paths = [paths]
return self._df(self._jreader.text(self._spark._sc._jvm.PythonUtils.toSeq(paths)))
@@ -476,7 +522,7 @@ class DataFrameReader(OptionUtils):
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None,
samplingRatio=None, enforceSchema=None, emptyValue=None, locale=None, lineSep=None,
- pathGlobFilter=None, recursiveFileLookup=None):
+ pathGlobFilter=None, recursiveFileLookup=None, modifiedBefore=None, modifiedAfter=None):
r"""Loads a CSV file and returns the result as a :class:`DataFrame`.
This function will go through the input once to determine the input schema if
@@ -631,6 +677,15 @@ class DataFrameReader(OptionUtils):
recursively scan a directory for files. Using this option disables
`partition discovery `_. # noqa
+ modification times occurring before the specified time. The provided timestamp
+ must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
+ modifiedBefore (batch only) : an optional timestamp to only include files with
+ modification times occurring before the specified time. The provided timestamp
+ must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
+ modifiedAfter (batch only) : an optional timestamp to only include files with
+ modification times occurring after the specified time. The provided timestamp
+ must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
+
Examples
--------
>>> df = spark.read.csv('python/test_support/sql/ages.csv')
@@ -652,7 +707,8 @@ class DataFrameReader(OptionUtils):
columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine,
charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, samplingRatio=samplingRatio,
enforceSchema=enforceSchema, emptyValue=emptyValue, locale=locale, lineSep=lineSep,
- pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup)
+ pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup,
+ modifiedBefore=modifiedBefore, modifiedAfter=modifiedAfter)
if isinstance(path, str):
path = [path]
if type(path) == list:
@@ -679,7 +735,8 @@ class DataFrameReader(OptionUtils):
else:
raise TypeError("path can be only string, list or RDD")
- def orc(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=None):
+ def orc(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=None,
+ modifiedBefore=None, modifiedAfter=None):
"""Loads ORC files, returning the result as a :class:`DataFrame`.
.. versionadded:: 1.5.0
@@ -701,6 +758,15 @@ class DataFrameReader(OptionUtils):
disables
`partition discovery `_. # noqa
+ modification times occurring before the specified time. The provided timestamp
+ must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
+ modifiedBefore : an optional timestamp to only include files with
+ modification times occurring before the specified time. The provided timestamp
+ must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
+ modifiedAfter : an optional timestamp to only include files with
+ modification times occurring after the specified time. The provided timestamp
+ must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
+
Examples
--------
>>> df = spark.read.orc('python/test_support/sql/orc_partitioned')
@@ -708,6 +774,7 @@ class DataFrameReader(OptionUtils):
[('a', 'bigint'), ('b', 'int'), ('c', 'int')]
"""
self._set_opts(mergeSchema=mergeSchema, pathGlobFilter=pathGlobFilter,
+ modifiedBefore=modifiedBefore, modifiedAfter=modifiedAfter,
recursiveFileLookup=recursiveFileLookup)
if isinstance(path, str):
path = [path]
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 276d5d29bf..b26bc6441b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -493,6 +493,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* `pathGlobFilter`: an optional glob pattern to only include files with paths matching
* the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter
.
* It does not change the behavior of partition discovery.
+ * `modifiedBefore` (batch only): an optional timestamp to only include files with
+ * modification times occurring before the specified Time. The provided timestamp
+ * must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
+ * `modifiedAfter` (batch only): an optional timestamp to only include files with
+ * modification times occurring after the specified Time. The provided timestamp
+ * must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
* `recursiveFileLookup`: recursively scan a directory for files. Using this option
* disables partition discovery
* `allowNonNumericNumbers` (default `true`): allows JSON parser to recognize set of
@@ -750,6 +756,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* `pathGlobFilter`: an optional glob pattern to only include files with paths matching
* the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter
.
* It does not change the behavior of partition discovery.
+ * `modifiedBefore` (batch only): an optional timestamp to only include files with
+ * modification times occurring before the specified Time. The provided timestamp
+ * must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
+ * `modifiedAfter` (batch only): an optional timestamp to only include files with
+ * modification times occurring after the specified Time. The provided timestamp
+ * must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
* `recursiveFileLookup`: recursively scan a directory for files. Using this option
* disables partition discovery
*
@@ -781,6 +793,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* `pathGlobFilter`: an optional glob pattern to only include files with paths matching
* the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter
.
* It does not change the behavior of partition discovery.
+ * `modifiedBefore` (batch only): an optional timestamp to only include files with
+ * modification times occurring before the specified Time. The provided timestamp
+ * must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
+ * `modifiedAfter` (batch only): an optional timestamp to only include files with
+ * modification times occurring after the specified Time. The provided timestamp
+ * must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
* `recursiveFileLookup`: recursively scan a directory for files. Using this option
* disables partition discovery
*
@@ -814,6 +832,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* `pathGlobFilter`: an optional glob pattern to only include files with paths matching
* the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter
.
* It does not change the behavior of partition discovery.
+ * `modifiedBefore` (batch only): an optional timestamp to only include files with
+ * modification times occurring before the specified Time. The provided timestamp
+ * must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
+ * `modifiedAfter` (batch only): an optional timestamp to only include files with
+ * modification times occurring after the specified Time. The provided timestamp
+ * must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
* `recursiveFileLookup`: recursively scan a directory for files. Using this option
* disables partition discovery
*
@@ -880,6 +904,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* `pathGlobFilter`: an optional glob pattern to only include files with paths matching
* the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter
.
* It does not change the behavior of partition discovery.
+ * `modifiedBefore` (batch only): an optional timestamp to only include files with
+ * modification times occurring before the specified Time. The provided timestamp
+ * must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
+ * `modifiedAfter` (batch only): an optional timestamp to only include files with
+ * modification times occurring after the specified Time. The provided timestamp
+ * must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
* `recursiveFileLookup`: recursively scan a directory for files. Using this option
* disables partition discovery
*
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
index fed9614347..5b0d0606da 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -57,13 +57,10 @@ abstract class PartitioningAwareFileIndex(
protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]]
private val caseInsensitiveMap = CaseInsensitiveMap(parameters)
+ private val pathFilters = PathFilterFactory.create(caseInsensitiveMap)
- protected lazy val pathGlobFilter: Option[GlobFilter] =
- caseInsensitiveMap.get("pathGlobFilter").map(new GlobFilter(_))
-
- protected def matchGlobPattern(file: FileStatus): Boolean = {
- pathGlobFilter.forall(_.accept(file.getPath))
- }
+ protected def matchPathPattern(file: FileStatus): Boolean =
+ pathFilters.forall(_.accept(file))
protected lazy val recursiveFileLookup: Boolean = {
caseInsensitiveMap.getOrElse("recursiveFileLookup", "false").toBoolean
@@ -86,7 +83,7 @@ abstract class PartitioningAwareFileIndex(
val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match {
case Some(existingDir) =>
// Directory has children files in it, return them
- existingDir.filter(f => matchGlobPattern(f) && isNonEmptyFile(f))
+ existingDir.filter(f => matchPathPattern(f) && isNonEmptyFile(f))
case None =>
// Directory does not exist, or has no children files
@@ -135,7 +132,7 @@ abstract class PartitioningAwareFileIndex(
} else {
leafFiles.values.toSeq
}
- files.filter(matchGlobPattern)
+ files.filter(matchPathPattern)
}
protected def inferPartitioning(): PartitionSpec = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/pathFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/pathFilters.scala
new file mode 100644
index 0000000000..c8f23988f9
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/pathFilters.scala
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.util.{Locale, TimeZone}
+
+import org.apache.hadoop.fs.{FileStatus, GlobFilter}
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.unsafe.types.UTF8String
+
+trait PathFilterStrategy extends Serializable {
+ def accept(fileStatus: FileStatus): Boolean
+}
+
+trait StrategyBuilder {
+ def create(parameters: CaseInsensitiveMap[String]): Option[PathFilterStrategy]
+}
+
+class PathGlobFilter(filePatten: String) extends PathFilterStrategy {
+
+ private val globFilter = new GlobFilter(filePatten)
+
+ override def accept(fileStatus: FileStatus): Boolean =
+ globFilter.accept(fileStatus.getPath)
+}
+
+object PathGlobFilter extends StrategyBuilder {
+ val PARAM_NAME = "pathglobfilter"
+
+ override def create(parameters: CaseInsensitiveMap[String]): Option[PathFilterStrategy] = {
+ parameters.get(PARAM_NAME).map(new PathGlobFilter(_))
+ }
+}
+
+/**
+ * Provide modifiedAfter and modifiedBefore options when
+ * filtering from a batch-based file data source.
+ *
+ * Example Usages
+ * Load all CSV files modified after date:
+ * {{{
+ * spark.read.format("csv").option("modifiedAfter","2020-06-15T05:00:00").load()
+ * }}}
+ *
+ * Load all CSV files modified before date:
+ * {{{
+ * spark.read.format("csv").option("modifiedBefore","2020-06-15T05:00:00").load()
+ * }}}
+ *
+ * Load all CSV files modified between two dates:
+ * {{{
+ * spark.read.format("csv").option("modifiedAfter","2019-01-15T05:00:00")
+ * .option("modifiedBefore","2020-06-15T05:00:00").load()
+ * }}}
+ */
+abstract class ModifiedDateFilter extends PathFilterStrategy {
+
+ def timeZoneId: String
+
+ protected def localTime(micros: Long): Long =
+ DateTimeUtils.fromUTCTime(micros, timeZoneId)
+}
+
+object ModifiedDateFilter {
+
+ def getTimeZoneId(options: CaseInsensitiveMap[String]): String = {
+ options.getOrElse(
+ DateTimeUtils.TIMEZONE_OPTION.toLowerCase(Locale.ROOT),
+ SQLConf.get.sessionLocalTimeZone)
+ }
+
+ def toThreshold(timeString: String, timeZoneId: String, strategy: String): Long = {
+ val timeZone: TimeZone = DateTimeUtils.getTimeZone(timeZoneId)
+ val ts = UTF8String.fromString(timeString)
+ DateTimeUtils.stringToTimestamp(ts, timeZone.toZoneId).getOrElse {
+ throw new AnalysisException(
+ s"The timestamp provided for the '$strategy' option is invalid. The expected format " +
+ s"is 'YYYY-MM-DDTHH:mm:ss', but the provided timestamp: $timeString")
+ }
+ }
+}
+
+/**
+ * Filter used to determine whether file was modified before the provided timestamp.
+ */
+class ModifiedBeforeFilter(thresholdTime: Long, val timeZoneId: String)
+ extends ModifiedDateFilter {
+
+ override def accept(fileStatus: FileStatus): Boolean =
+ // We standardize on microseconds wherever possible
+ // getModificationTime returns in milliseconds
+ thresholdTime - localTime(DateTimeUtils.millisToMicros(fileStatus.getModificationTime)) > 0
+}
+
+object ModifiedBeforeFilter extends StrategyBuilder {
+ import ModifiedDateFilter._
+
+ val PARAM_NAME = "modifiedbefore"
+
+ override def create(parameters: CaseInsensitiveMap[String]): Option[PathFilterStrategy] = {
+ parameters.get(PARAM_NAME).map { value =>
+ val timeZoneId = getTimeZoneId(parameters)
+ val thresholdTime = toThreshold(value, timeZoneId, PARAM_NAME)
+ new ModifiedBeforeFilter(thresholdTime, timeZoneId)
+ }
+ }
+}
+
+/**
+ * Filter used to determine whether file was modified after the provided timestamp.
+ */
+class ModifiedAfterFilter(thresholdTime: Long, val timeZoneId: String)
+ extends ModifiedDateFilter {
+
+ override def accept(fileStatus: FileStatus): Boolean =
+ // getModificationTime returns in milliseconds
+ // We standardize on microseconds wherever possible
+ localTime(DateTimeUtils.millisToMicros(fileStatus.getModificationTime)) - thresholdTime > 0
+}
+
+object ModifiedAfterFilter extends StrategyBuilder {
+ import ModifiedDateFilter._
+
+ val PARAM_NAME = "modifiedafter"
+
+ override def create(parameters: CaseInsensitiveMap[String]): Option[PathFilterStrategy] = {
+ parameters.get(PARAM_NAME).map { value =>
+ val timeZoneId = getTimeZoneId(parameters)
+ val thresholdTime = toThreshold(value, timeZoneId, PARAM_NAME)
+ new ModifiedAfterFilter(thresholdTime, timeZoneId)
+ }
+ }
+}
+
+object PathFilterFactory {
+
+ private val strategies =
+ Seq(PathGlobFilter, ModifiedBeforeFilter, ModifiedAfterFilter)
+
+ def create(parameters: CaseInsensitiveMap[String]): Seq[PathFilterStrategy] = {
+ strategies.flatMap { _.create(parameters) }
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
index 712ed1585b..6f43542fd6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
@@ -23,6 +23,7 @@ import scala.util.Try
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.execution.datasources.{ModifiedAfterFilter, ModifiedBeforeFilter}
import org.apache.spark.util.Utils
/**
@@ -32,6 +33,16 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging
def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters))
+ checkDisallowedOptions(parameters)
+
+ private def checkDisallowedOptions(options: Map[String, String]): Unit = {
+ Seq(ModifiedBeforeFilter.PARAM_NAME, ModifiedAfterFilter.PARAM_NAME).foreach { param =>
+ if (parameters.contains(param)) {
+ throw new IllegalArgumentException(s"option '$param' is not allowed in file stream sources")
+ }
+ }
+ }
+
val maxFilesPerTrigger: Option[Int] = parameters.get("maxFilesPerTrigger").map { str =>
Try(str.toInt).toOption.filter(_ > 0).getOrElse {
throw new IllegalArgumentException(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index b27c114518..876f62803d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -577,38 +577,6 @@ class FileBasedDataSourceSuite extends QueryTest
}
}
- test("Option pathGlobFilter: filter files correctly") {
- withTempPath { path =>
- val dataDir = path.getCanonicalPath
- Seq("foo").toDS().write.text(dataDir)
- Seq("bar").toDS().write.mode("append").orc(dataDir)
- val df = spark.read.option("pathGlobFilter", "*.txt").text(dataDir)
- checkAnswer(df, Row("foo"))
-
- // Both glob pattern in option and path should be effective to filter files.
- val df2 = spark.read.option("pathGlobFilter", "*.txt").text(dataDir + "/*.orc")
- checkAnswer(df2, Seq.empty)
-
- val df3 = spark.read.option("pathGlobFilter", "*.txt").text(dataDir + "/*xt")
- checkAnswer(df3, Row("foo"))
- }
- }
-
- test("Option pathGlobFilter: simple extension filtering should contains partition info") {
- withTempPath { path =>
- val input = Seq(("foo", 1), ("oof", 2)).toDF("a", "b")
- input.write.partitionBy("b").text(path.getCanonicalPath)
- Seq("bar").toDS().write.mode("append").orc(path.getCanonicalPath + "/b=1")
-
- // If we use glob pattern in the path, the partition column won't be shown in the result.
- val df = spark.read.text(path.getCanonicalPath + "/*/*.txt")
- checkAnswer(df, input.select("a"))
-
- val df2 = spark.read.option("pathGlobFilter", "*.txt").text(path.getCanonicalPath)
- checkAnswer(df2, input)
- }
- }
-
test("Option recursiveFileLookup: recursive loading correctly") {
val expectedFileList = mutable.ListBuffer[String]()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PathFilterStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PathFilterStrategySuite.scala
new file mode 100644
index 0000000000..b965a78c9e
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PathFilterStrategySuite.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.test.SharedSparkSession
+
+class PathFilterStrategySuite extends QueryTest with SharedSparkSession {
+
+ test("SPARK-31962: PathFilterStrategies - modifiedAfter option") {
+ val options =
+ CaseInsensitiveMap[String](Map("modifiedAfter" -> "2010-10-01T01:01:00"))
+ val strategy = PathFilterFactory.create(options)
+ assert(strategy.head.isInstanceOf[ModifiedAfterFilter])
+ assert(strategy.size == 1)
+ }
+
+ test("SPARK-31962: PathFilterStrategies - modifiedBefore option") {
+ val options =
+ CaseInsensitiveMap[String](Map("modifiedBefore" -> "2020-10-01T01:01:00"))
+ val strategy = PathFilterFactory.create(options)
+ assert(strategy.head.isInstanceOf[ModifiedBeforeFilter])
+ assert(strategy.size == 1)
+ }
+
+ test("SPARK-31962: PathFilterStrategies - pathGlobFilter option") {
+ val options = CaseInsensitiveMap[String](Map("pathGlobFilter" -> "*.txt"))
+ val strategy = PathFilterFactory.create(options)
+ assert(strategy.head.isInstanceOf[PathGlobFilter])
+ assert(strategy.size == 1)
+ }
+
+ test("SPARK-31962: PathFilterStrategies - no options") {
+ val options = CaseInsensitiveMap[String](Map.empty)
+ val strategy = PathFilterFactory.create(options)
+ assert(strategy.isEmpty)
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PathFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PathFilterSuite.scala
new file mode 100644
index 0000000000..1af2adfd86
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PathFilterSuite.scala
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.File
+import java.time.{LocalDateTime, ZoneId, ZoneOffset}
+import java.time.format.DateTimeFormatter
+
+import scala.util.Random
+
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.catalyst.util.{stringToFile, DateTimeUtils}
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+
+class PathFilterSuite extends QueryTest with SharedSparkSession {
+ import testImplicits._
+
+ test("SPARK-31962: modifiedBefore specified" +
+ " and sharing same timestamp with file last modified time.") {
+ withTempDir { dir =>
+ val curTime = LocalDateTime.now(ZoneOffset.UTC)
+ executeTest(dir, Seq(curTime), 0, modifiedBefore = Some(formatTime(curTime)))
+ }
+ }
+
+ test("SPARK-31962: modifiedAfter specified" +
+ " and sharing same timestamp with file last modified time.") {
+ withTempDir { dir =>
+ val curTime = LocalDateTime.now(ZoneOffset.UTC)
+ executeTest(dir, Seq(curTime), 0, modifiedAfter = Some(formatTime(curTime)))
+ }
+ }
+
+ test("SPARK-31962: modifiedBefore and modifiedAfter option" +
+ " share same timestamp with file last modified time.") {
+ withTempDir { dir =>
+ val curTime = LocalDateTime.now(ZoneOffset.UTC)
+ val formattedTime = formatTime(curTime)
+ executeTest(dir, Seq(curTime), 0, modifiedBefore = Some(formattedTime),
+ modifiedAfter = Some(formattedTime))
+ }
+ }
+
+ test("SPARK-31962: modifiedBefore and modifiedAfter option" +
+ " share same timestamp with earlier file last modified time.") {
+ withTempDir { dir =>
+ val curTime = LocalDateTime.now(ZoneOffset.UTC)
+ val fileTime = curTime.minusDays(3)
+ val formattedTime = formatTime(curTime)
+ executeTest(dir, Seq(fileTime), 0, modifiedBefore = Some(formattedTime),
+ modifiedAfter = Some(formattedTime))
+ }
+ }
+
+ test("SPARK-31962: modifiedBefore and modifiedAfter option" +
+ " share same timestamp with later file last modified time.") {
+ withTempDir { dir =>
+ val curTime = LocalDateTime.now(ZoneOffset.UTC)
+ val formattedTime = formatTime(curTime)
+ executeTest(dir, Seq(curTime), 0, modifiedBefore = Some(formattedTime),
+ modifiedAfter = Some(formattedTime))
+ }
+ }
+
+ test("SPARK-31962: when modifiedAfter specified with a past date") {
+ withTempDir { dir =>
+ val curTime = LocalDateTime.now(ZoneOffset.UTC)
+ val pastTime = curTime.minusYears(1)
+ val formattedTime = formatTime(pastTime)
+ executeTest(dir, Seq(curTime), 1, modifiedAfter = Some(formattedTime))
+ }
+ }
+
+ test("SPARK-31962: when modifiedBefore specified with a future date") {
+ withTempDir { dir =>
+ val curTime = LocalDateTime.now(ZoneOffset.UTC)
+ val futureTime = curTime.plusYears(1)
+ val formattedTime = formatTime(futureTime)
+ executeTest(dir, Seq(curTime), 1, modifiedBefore = Some(formattedTime))
+ }
+ }
+
+ test("SPARK-31962: with modifiedBefore option provided using a past date") {
+ withTempDir { dir =>
+ val curTime = LocalDateTime.now(ZoneOffset.UTC)
+ val pastTime = curTime.minusYears(1)
+ val formattedTime = formatTime(pastTime)
+ executeTest(dir, Seq(curTime), 0, modifiedBefore = Some(formattedTime))
+ }
+ }
+
+ test("SPARK-31962: modifiedAfter specified with a past date, multiple files, one valid") {
+ withTempDir { dir =>
+ val fileTime1 = LocalDateTime.now(ZoneOffset.UTC)
+ val fileTime2 = LocalDateTime.ofEpochSecond(0, 0, ZoneOffset.UTC)
+ val pastTime = fileTime1.minusYears(1)
+ val formattedTime = formatTime(pastTime)
+ executeTest(dir, Seq(fileTime1, fileTime2), 1, modifiedAfter = Some(formattedTime))
+ }
+ }
+
+ test("SPARK-31962: modifiedAfter specified with a past date, multiple files, both valid") {
+ withTempDir { dir =>
+ val curTime = LocalDateTime.now(ZoneOffset.UTC)
+ val pastTime = curTime.minusYears(1)
+ val formattedTime = formatTime(pastTime)
+ executeTest(dir, Seq(curTime, curTime), 2, modifiedAfter = Some(formattedTime))
+ }
+ }
+
+ test("SPARK-31962: modifiedAfter specified with a past date, multiple files, none valid") {
+ withTempDir { dir =>
+ val fileTime = LocalDateTime.ofEpochSecond(0, 0, ZoneOffset.UTC)
+ val pastTime = LocalDateTime.now(ZoneOffset.UTC).minusYears(1)
+ val formattedTime = formatTime(pastTime)
+ executeTest(dir, Seq(fileTime, fileTime), 0, modifiedAfter = Some(formattedTime))
+ }
+ }
+
+ test("SPARK-31962: modifiedBefore specified with a future date, multiple files, both valid") {
+ withTempDir { dir =>
+ val fileTime = LocalDateTime.ofEpochSecond(0, 0, ZoneOffset.UTC)
+ val futureTime = LocalDateTime.now(ZoneOffset.UTC).plusYears(1)
+ val formattedTime = formatTime(futureTime)
+ executeTest(dir, Seq(fileTime, fileTime), 2, modifiedBefore = Some(formattedTime))
+ }
+ }
+
+ test("SPARK-31962: modifiedBefore specified with a future date, multiple files, one valid") {
+ withTempDir { dir =>
+ val curTime = LocalDateTime.now(ZoneOffset.UTC)
+ val fileTime1 = LocalDateTime.ofEpochSecond(0, 0, ZoneOffset.UTC)
+ val fileTime2 = curTime.plusDays(3)
+ val formattedTime = formatTime(curTime)
+ executeTest(dir, Seq(fileTime1, fileTime2), 1, modifiedBefore = Some(formattedTime))
+ }
+ }
+
+ test("SPARK-31962: modifiedBefore specified with a future date, multiple files, none valid") {
+ withTempDir { dir =>
+ val fileTime = LocalDateTime.now(ZoneOffset.UTC).minusDays(1)
+ val formattedTime = formatTime(fileTime)
+ executeTest(dir, Seq(fileTime, fileTime), 0, modifiedBefore = Some(formattedTime))
+ }
+ }
+
+ test("SPARK-31962: modifiedBefore/modifiedAfter is specified with an invalid date") {
+ executeTestWithBadOption(
+ Map("modifiedBefore" -> "2024-05+1 01:00:00"),
+ Seq("The timestamp provided", "modifiedbefore", "2024-05+1 01:00:00"))
+
+ executeTestWithBadOption(
+ Map("modifiedAfter" -> "2024-05+1 01:00:00"),
+ Seq("The timestamp provided", "modifiedafter", "2024-05+1 01:00:00"))
+ }
+
+ test("SPARK-31962: modifiedBefore/modifiedAfter - empty option") {
+ executeTestWithBadOption(
+ Map("modifiedBefore" -> ""),
+ Seq("The timestamp provided", "modifiedbefore"))
+
+ executeTestWithBadOption(
+ Map("modifiedAfter" -> ""),
+ Seq("The timestamp provided", "modifiedafter"))
+ }
+
+ test("SPARK-31962: modifiedBefore/modifiedAfter filter takes into account local timezone " +
+ "when specified as an option.") {
+ Seq("modifiedbefore", "modifiedafter").foreach { filterName =>
+ // CET = UTC + 1 hour, HST = UTC - 10 hours
+ Seq("CET", "HST").foreach { tzId =>
+ testModifiedDateFilterWithTimezone(tzId, filterName)
+ }
+ }
+ }
+
+ test("Option pathGlobFilter: filter files correctly") {
+ withTempPath { path =>
+ val dataDir = path.getCanonicalPath
+ Seq("foo").toDS().write.text(dataDir)
+ Seq("bar").toDS().write.mode("append").orc(dataDir)
+ val df = spark.read.option("pathGlobFilter", "*.txt").text(dataDir)
+ checkAnswer(df, Row("foo"))
+
+ // Both glob pattern in option and path should be effective to filter files.
+ val df2 = spark.read.option("pathGlobFilter", "*.txt").text(dataDir + "/*.orc")
+ checkAnswer(df2, Seq.empty)
+
+ val df3 = spark.read.option("pathGlobFilter", "*.txt").text(dataDir + "/*xt")
+ checkAnswer(df3, Row("foo"))
+ }
+ }
+
+ test("Option pathGlobFilter: simple extension filtering should contains partition info") {
+ withTempPath { path =>
+ val input = Seq(("foo", 1), ("oof", 2)).toDF("a", "b")
+ input.write.partitionBy("b").text(path.getCanonicalPath)
+ Seq("bar").toDS().write.mode("append").orc(path.getCanonicalPath + "/b=1")
+
+ // If we use glob pattern in the path, the partition column won't be shown in the result.
+ val df = spark.read.text(path.getCanonicalPath + "/*/*.txt")
+ checkAnswer(df, input.select("a"))
+
+ val df2 = spark.read.option("pathGlobFilter", "*.txt").text(path.getCanonicalPath)
+ checkAnswer(df2, input)
+ }
+ }
+
+ private def executeTest(
+ dir: File,
+ fileDates: Seq[LocalDateTime],
+ expectedCount: Long,
+ modifiedBefore: Option[String] = None,
+ modifiedAfter: Option[String] = None): Unit = {
+ fileDates.foreach { fileDate =>
+ val file = createSingleFile(dir)
+ setFileTime(fileDate, file)
+ }
+
+ val schema = StructType(Seq(StructField("a", StringType)))
+
+ var dfReader = spark.read.format("csv").option("timeZone", "UTC").schema(schema)
+ modifiedBefore.foreach { opt => dfReader = dfReader.option("modifiedBefore", opt) }
+ modifiedAfter.foreach { opt => dfReader = dfReader.option("modifiedAfter", opt) }
+
+ if (expectedCount > 0) {
+ // without pathGlobFilter
+ val df1 = dfReader.load(dir.getCanonicalPath)
+ assert(df1.count() === expectedCount)
+
+ // pathGlobFilter matched
+ val df2 = dfReader.option("pathGlobFilter", "*.csv").load(dir.getCanonicalPath)
+ assert(df2.count() === expectedCount)
+
+ // pathGlobFilter mismatched
+ val df3 = dfReader.option("pathGlobFilter", "*.txt").load(dir.getCanonicalPath)
+ assert(df3.count() === 0)
+ } else {
+ val df = dfReader.load(dir.getCanonicalPath)
+ assert(df.count() === 0)
+ }
+ }
+
+ private def executeTestWithBadOption(
+ options: Map[String, String],
+ expectedMsgParts: Seq[String]): Unit = {
+ withTempDir { dir =>
+ createSingleFile(dir)
+ val exc = intercept[AnalysisException] {
+ var dfReader = spark.read.format("csv")
+ options.foreach { case (key, value) =>
+ dfReader = dfReader.option(key, value)
+ }
+ dfReader.load(dir.getCanonicalPath)
+ }
+ expectedMsgParts.foreach { msg => assert(exc.getMessage.contains(msg)) }
+ }
+ }
+
+ private def testModifiedDateFilterWithTimezone(
+ timezoneId: String,
+ filterParamName: String): Unit = {
+ val curTime = LocalDateTime.now(ZoneOffset.UTC)
+ val zoneId: ZoneId = DateTimeUtils.getTimeZone(timezoneId).toZoneId
+ val strategyTimeInMicros =
+ ModifiedDateFilter.toThreshold(
+ curTime.toString,
+ timezoneId,
+ filterParamName)
+ val strategyTimeInSeconds = strategyTimeInMicros / 1000 / 1000
+
+ val curTimeAsSeconds = curTime.atZone(zoneId).toEpochSecond
+ withClue(s"timezone: $timezoneId / param: $filterParamName,") {
+ assert(strategyTimeInSeconds === curTimeAsSeconds)
+ }
+ }
+
+ private def createSingleFile(dir: File): File = {
+ val file = new File(dir, "temp" + Random.nextInt(1000000) + ".csv")
+ stringToFile(file, "text")
+ }
+
+ private def setFileTime(time: LocalDateTime, file: File): Boolean = {
+ val sameTime = time.toEpochSecond(ZoneOffset.UTC)
+ file.setLastModified(sameTime * 1000)
+ }
+
+ private def formatTime(time: LocalDateTime): String = {
+ time.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"))
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index cf9664a976..718095003b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.streaming
import java.io.File
import java.net.URI
+import java.time.{LocalDateTime, ZoneOffset}
+import java.time.format.DateTimeFormatter
import java.util.concurrent.atomic.AtomicLong
import scala.collection.mutable
@@ -40,7 +42,6 @@ import org.apache.spark.sql.execution.streaming.sources.MemorySink
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.util.StreamManualClock
import org.apache.spark.sql.test.SharedSparkSession
-import org.apache.spark.sql.types._
import org.apache.spark.sql.types.{StructType, _}
import org.apache.spark.util.Utils
@@ -2054,6 +2055,47 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}
+ test("SPARK-31962: file stream source shouldn't allow modifiedBefore/modifiedAfter") {
+ def formatTime(time: LocalDateTime): String = {
+ time.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"))
+ }
+
+ def assertOptionIsNotSupported(options: Map[String, String], path: String): Unit = {
+ val schema = StructType(Seq(StructField("a", StringType)))
+ var dsReader = spark.readStream
+ .format("csv")
+ .option("timeZone", "UTC")
+ .schema(schema)
+
+ options.foreach { case (k, v) => dsReader = dsReader.option(k, v) }
+
+ val df = dsReader.load(path)
+
+ testStream(df)(
+ ExpectFailure[IllegalArgumentException](
+ t => assert(t.getMessage.contains("is not allowed in file stream source")),
+ isFatalError = false)
+ )
+ }
+
+ withTempDir { dir =>
+ // "modifiedBefore"
+ val futureTime = LocalDateTime.now(ZoneOffset.UTC).plusYears(1)
+ val formattedFutureTime = formatTime(futureTime)
+ assertOptionIsNotSupported(Map("modifiedBefore" -> formattedFutureTime), dir.getCanonicalPath)
+
+ // "modifiedAfter"
+ val prevTime = LocalDateTime.now(ZoneOffset.UTC).minusYears(1)
+ val formattedPrevTime = formatTime(prevTime)
+ assertOptionIsNotSupported(Map("modifiedAfter" -> formattedPrevTime), dir.getCanonicalPath)
+
+ // both
+ assertOptionIsNotSupported(
+ Map("modifiedBefore" -> formattedFutureTime, "modifiedAfter" -> formattedPrevTime),
+ dir.getCanonicalPath)
+ }
+ }
+
private def createFile(content: String, src: File, tmp: File): File = {
val tempFile = Utils.tempFileWith(new File(tmp, "text"))
val finalFile = new File(src, tempFile.getName)