From d338af3101a4c986b5e979e8fdc63b8551e12d29 Mon Sep 17 00:00:00 2001 From: CC Highman Date: Mon, 23 Nov 2020 08:30:41 +0900 Subject: [PATCH] [SPARK-31962][SQL] Provide modifiedAfter and modifiedBefore options when filtering from a batch-based file data source ### What changes were proposed in this pull request? Two new options, _modifiiedBefore_ and _modifiedAfter_, is provided expecting a value in 'YYYY-MM-DDTHH:mm:ss' format. _PartioningAwareFileIndex_ considers these options during the process of checking for files, just before considering applied _PathFilters_ such as `pathGlobFilter.` In order to filter file results, a new PathFilter class was derived for this purpose. General house-keeping around classes extending PathFilter was performed for neatness. It became apparent support was needed to handle multiple potential path filters. Logic was introduced for this purpose and the associated tests written. ### Why are the changes needed? When loading files from a data source, there can often times be thousands of file within a respective file path. In many cases I've seen, we want to start loading from a folder path and ideally be able to begin loading files having modification dates past a certain point. This would mean out of thousands of potential files, only the ones with modification dates greater than the specified timestamp would be considered. This saves a ton of time automatically and reduces significant complexity managing this in code. ### Does this PR introduce _any_ user-facing change? This PR introduces an option that can be used with batch-based Spark file data sources. A documentation update was made to reflect an example and usage of the new data source option. **Example Usages** _Load all CSV files modified after date:_ `spark.read.format("csv").option("modifiedAfter","2020-06-15T05:00:00").load()` _Load all CSV files modified before date:_ `spark.read.format("csv").option("modifiedBefore","2020-06-15T05:00:00").load()` _Load all CSV files modified between two dates:_ `spark.read.format("csv").option("modifiedAfter","2019-01-15T05:00:00").option("modifiedBefore","2020-06-15T05:00:00").load() ` ### How was this patch tested? A handful of unit tests were added to support the positive, negative, and edge case code paths. It's also live in a handful of our Databricks dev environments. (quoted from cchighman) Closes #30411 from HeartSaVioR/SPARK-31962. Lead-authored-by: CC Highman Co-authored-by: Jungtaek Lim (HeartSaVioR) Signed-off-by: Jungtaek Lim (HeartSaVioR) --- docs/sql-data-sources-generic-options.md | 37 +++ .../sql/JavaSQLDataSourceExample.java | 16 + examples/src/main/python/sql/datasource.py | 20 ++ examples/src/main/r/RSparkSQLExample.R | 8 + .../examples/sql/SQLDataSourceExample.scala | 21 ++ python/pyspark/sql/readwriter.py | 81 ++++- .../apache/spark/sql/DataFrameReader.scala | 30 ++ .../PartitioningAwareFileIndex.scala | 13 +- .../execution/datasources/pathFilters.scala | 161 +++++++++ .../streaming/FileStreamOptions.scala | 11 + .../spark/sql/FileBasedDataSourceSuite.scala | 32 -- .../datasources/PathFilterStrategySuite.scala | 54 +++ .../datasources/PathFilterSuite.scala | 307 ++++++++++++++++++ .../sql/streaming/FileStreamSourceSuite.scala | 44 ++- 14 files changed, 787 insertions(+), 48 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/pathFilters.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PathFilterStrategySuite.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PathFilterSuite.scala diff --git a/docs/sql-data-sources-generic-options.md b/docs/sql-data-sources-generic-options.md index 6bcf48235b..2e4fc879a4 100644 --- a/docs/sql-data-sources-generic-options.md +++ b/docs/sql-data-sources-generic-options.md @@ -119,3 +119,40 @@ To load all files recursively, you can use: {% include_example recursive_file_lookup r/RSparkSQLExample.R %} + +### Modification Time Path Filters + +`modifiedBefore` and `modifiedAfter` are options that can be +applied together or separately in order to achieve greater +granularity over which files may load during a Spark batch query. +(Note that Structured Streaming file sources don't support these options.) + +* `modifiedBefore`: an optional timestamp to only include files with +modification times occurring before the specified time. The provided timestamp +must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00) +* `modifiedAfter`: an optional timestamp to only include files with +modification times occurring after the specified time. The provided timestamp +must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00) + +When a timezone option is not provided, the timestamps will be interpreted according +to the Spark session timezone (`spark.sql.session.timeZone`). + +To load files with paths matching a given modified time range, you can use: + +
+
+{% include_example load_with_modified_time_filter scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %} +
+ +
+{% include_example load_with_modified_time_filter java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %} +
+ +
+{% include_example load_with_modified_time_filter python/sql/datasource.py %} +
+ +
+{% include_example load_with_modified_time_filter r/RSparkSQLExample.R %} +
+
\ No newline at end of file diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java index 2295225387..46e740d78b 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java @@ -147,6 +147,22 @@ public class JavaSQLDataSourceExample { // |file1.parquet| // +-------------+ // $example off:load_with_path_glob_filter$ + // $example on:load_with_modified_time_filter$ + Dataset beforeFilterDF = spark.read().format("parquet") + // Only load files modified before 7/1/2020 at 05:30 + .option("modifiedBefore", "2020-07-01T05:30:00") + // Only load files modified after 6/1/2020 at 05:30 + .option("modifiedAfter", "2020-06-01T05:30:00") + // Interpret both times above relative to CST timezone + .option("timeZone", "CST") + .load("examples/src/main/resources/dir1"); + beforeFilterDF.show(); + // +-------------+ + // | file| + // +-------------+ + // |file1.parquet| + // +-------------+ + // $example off:load_with_modified_time_filter$ } private static void runBasicDataSourceExample(SparkSession spark) { diff --git a/examples/src/main/python/sql/datasource.py b/examples/src/main/python/sql/datasource.py index eecd8c2d84..8c146ba0c9 100644 --- a/examples/src/main/python/sql/datasource.py +++ b/examples/src/main/python/sql/datasource.py @@ -67,6 +67,26 @@ def generic_file_source_options_example(spark): # +-------------+ # $example off:load_with_path_glob_filter$ + # $example on:load_with_modified_time_filter$ + # Only load files modified before 07/1/2050 @ 08:30:00 + df = spark.read.load("examples/src/main/resources/dir1", + format="parquet", modifiedBefore="2050-07-01T08:30:00") + df.show() + # +-------------+ + # | file| + # +-------------+ + # |file1.parquet| + # +-------------+ + # Only load files modified after 06/01/2050 @ 08:30:00 + df = spark.read.load("examples/src/main/resources/dir1", + format="parquet", modifiedAfter="2050-06-01T08:30:00") + df.show() + # +-------------+ + # | file| + # +-------------+ + # +-------------+ + # $example off:load_with_modified_time_filter$ + def basic_datasource_example(spark): # $example on:generic_load_save_functions$ diff --git a/examples/src/main/r/RSparkSQLExample.R b/examples/src/main/r/RSparkSQLExample.R index 8685cfb5c0..86ad533424 100644 --- a/examples/src/main/r/RSparkSQLExample.R +++ b/examples/src/main/r/RSparkSQLExample.R @@ -144,6 +144,14 @@ df <- read.df("examples/src/main/resources/dir1", "parquet", pathGlobFilter = "* # 1 file1.parquet # $example off:load_with_path_glob_filter$ +# $example on:load_with_modified_time_filter$ +beforeDF <- read.df("examples/src/main/resources/dir1", "parquet", modifiedBefore= "2020-07-01T05:30:00") +# file +# 1 file1.parquet +afterDF <- read.df("examples/src/main/resources/dir1", "parquet", modifiedAfter = "2020-06-01T05:30:00") +# file +# $example off:load_with_modified_time_filter$ + # $example on:manual_save_options_orc$ df <- read.df("examples/src/main/resources/users.orc", "orc") write.orc(df, "users_with_options.orc", orc.bloom.filter.columns = "favorite_color", orc.dictionary.key.threshold = 1.0, orc.column.encoding.direct = "name") diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala b/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala index 2c7abfcd33..90c0eeb5ba 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala @@ -81,6 +81,27 @@ object SQLDataSourceExample { // |file1.parquet| // +-------------+ // $example off:load_with_path_glob_filter$ + // $example on:load_with_modified_time_filter$ + val beforeFilterDF = spark.read.format("parquet") + // Files modified before 07/01/2020 at 05:30 are allowed + .option("modifiedBefore", "2020-07-01T05:30:00") + .load("examples/src/main/resources/dir1"); + beforeFilterDF.show(); + // +-------------+ + // | file| + // +-------------+ + // |file1.parquet| + // +-------------+ + val afterFilterDF = spark.read.format("parquet") + // Files modified after 06/01/2020 at 05:30 are allowed + .option("modifiedAfter", "2020-06-01T05:30:00") + .load("examples/src/main/resources/dir1"); + afterFilterDF.show(); + // +-------------+ + // | file| + // +-------------+ + // +-------------+ + // $example off:load_with_modified_time_filter$ } private def runBasicDataSourceExample(spark: SparkSession): Unit = { diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 2ed991c87f..bb31e6a3e0 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -125,6 +125,12 @@ class DataFrameReader(OptionUtils): * ``pathGlobFilter``: an optional glob pattern to only include files with paths matching the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter. It does not change the behavior of partition discovery. + * ``modifiedBefore``: an optional timestamp to only include files with + modification times occurring before the specified time. The provided timestamp + must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00) + * ``modifiedAfter``: an optional timestamp to only include files with + modification times occurring after the specified time. The provided timestamp + must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00) """ self._jreader = self._jreader.option(key, to_str(value)) return self @@ -149,6 +155,12 @@ class DataFrameReader(OptionUtils): * ``pathGlobFilter``: an optional glob pattern to only include files with paths matching the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter. It does not change the behavior of partition discovery. + * ``modifiedBefore``: an optional timestamp to only include files with + modification times occurring before the specified time. The provided timestamp + must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00) + * ``modifiedAfter``: an optional timestamp to only include files with + modification times occurring after the specified time. The provided timestamp + must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00) """ for k in options: self._jreader = self._jreader.option(k, to_str(options[k])) @@ -203,7 +215,8 @@ class DataFrameReader(OptionUtils): mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None, multiLine=None, allowUnquotedControlChars=None, lineSep=None, samplingRatio=None, dropFieldIfAllNull=None, encoding=None, locale=None, pathGlobFilter=None, - recursiveFileLookup=None, allowNonNumericNumbers=None): + recursiveFileLookup=None, allowNonNumericNumbers=None, + modifiedBefore=None, modifiedAfter=None): """ Loads JSON files and returns the results as a :class:`DataFrame`. @@ -322,6 +335,13 @@ class DataFrameReader(OptionUtils): ``+Infinity`` and ``Infinity``. * ``-INF``: for negative infinity, alias ``-Infinity``. * ``NaN``: for other not-a-numbers, like result of division by zero. + modifiedBefore : an optional timestamp to only include files with + modification times occurring before the specified time. The provided timestamp + must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00) + modifiedAfter : an optional timestamp to only include files with + modification times occurring after the specified time. The provided timestamp + must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00) + Examples -------- @@ -344,6 +364,7 @@ class DataFrameReader(OptionUtils): allowUnquotedControlChars=allowUnquotedControlChars, lineSep=lineSep, samplingRatio=samplingRatio, dropFieldIfAllNull=dropFieldIfAllNull, encoding=encoding, locale=locale, pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup, + modifiedBefore=modifiedBefore, modifiedAfter=modifiedAfter, allowNonNumericNumbers=allowNonNumericNumbers) if isinstance(path, str): path = [path] @@ -410,6 +431,15 @@ class DataFrameReader(OptionUtils): disables `partition discovery `_. # noqa + modification times occurring before the specified time. The provided timestamp + must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00) + modifiedBefore (batch only) : an optional timestamp to only include files with + modification times occurring before the specified time. The provided timestamp + must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00) + modifiedAfter (batch only) : an optional timestamp to only include files with + modification times occurring after the specified time. The provided timestamp + must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00) + Examples -------- >>> df = spark.read.parquet('python/test_support/sql/parquet_partitioned') @@ -418,13 +448,18 @@ class DataFrameReader(OptionUtils): """ mergeSchema = options.get('mergeSchema', None) pathGlobFilter = options.get('pathGlobFilter', None) + modifiedBefore = options.get('modifiedBefore', None) + modifiedAfter = options.get('modifiedAfter', None) recursiveFileLookup = options.get('recursiveFileLookup', None) self._set_opts(mergeSchema=mergeSchema, pathGlobFilter=pathGlobFilter, - recursiveFileLookup=recursiveFileLookup) + recursiveFileLookup=recursiveFileLookup, modifiedBefore=modifiedBefore, + modifiedAfter=modifiedAfter) + return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths))) def text(self, paths, wholetext=False, lineSep=None, pathGlobFilter=None, - recursiveFileLookup=None): + recursiveFileLookup=None, modifiedBefore=None, + modifiedAfter=None): """ Loads text files and returns a :class:`DataFrame` whose schema starts with a string column named "value", and followed by partitioned columns if there @@ -453,6 +488,15 @@ class DataFrameReader(OptionUtils): recursively scan a directory for files. Using this option disables `partition discovery `_. # noqa + modification times occurring before the specified time. The provided timestamp + must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00) + modifiedBefore (batch only) : an optional timestamp to only include files with + modification times occurring before the specified time. The provided timestamp + must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00) + modifiedAfter (batch only) : an optional timestamp to only include files with + modification times occurring after the specified time. The provided timestamp + must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00) + Examples -------- >>> df = spark.read.text('python/test_support/sql/text-test.txt') @@ -464,7 +508,9 @@ class DataFrameReader(OptionUtils): """ self._set_opts( wholetext=wholetext, lineSep=lineSep, pathGlobFilter=pathGlobFilter, - recursiveFileLookup=recursiveFileLookup) + recursiveFileLookup=recursiveFileLookup, modifiedBefore=modifiedBefore, + modifiedAfter=modifiedAfter) + if isinstance(paths, str): paths = [paths] return self._df(self._jreader.text(self._spark._sc._jvm.PythonUtils.toSeq(paths))) @@ -476,7 +522,7 @@ class DataFrameReader(OptionUtils): maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None, samplingRatio=None, enforceSchema=None, emptyValue=None, locale=None, lineSep=None, - pathGlobFilter=None, recursiveFileLookup=None): + pathGlobFilter=None, recursiveFileLookup=None, modifiedBefore=None, modifiedAfter=None): r"""Loads a CSV file and returns the result as a :class:`DataFrame`. This function will go through the input once to determine the input schema if @@ -631,6 +677,15 @@ class DataFrameReader(OptionUtils): recursively scan a directory for files. Using this option disables `partition discovery `_. # noqa + modification times occurring before the specified time. The provided timestamp + must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00) + modifiedBefore (batch only) : an optional timestamp to only include files with + modification times occurring before the specified time. The provided timestamp + must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00) + modifiedAfter (batch only) : an optional timestamp to only include files with + modification times occurring after the specified time. The provided timestamp + must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00) + Examples -------- >>> df = spark.read.csv('python/test_support/sql/ages.csv') @@ -652,7 +707,8 @@ class DataFrameReader(OptionUtils): columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine, charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, samplingRatio=samplingRatio, enforceSchema=enforceSchema, emptyValue=emptyValue, locale=locale, lineSep=lineSep, - pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup) + pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup, + modifiedBefore=modifiedBefore, modifiedAfter=modifiedAfter) if isinstance(path, str): path = [path] if type(path) == list: @@ -679,7 +735,8 @@ class DataFrameReader(OptionUtils): else: raise TypeError("path can be only string, list or RDD") - def orc(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=None): + def orc(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=None, + modifiedBefore=None, modifiedAfter=None): """Loads ORC files, returning the result as a :class:`DataFrame`. .. versionadded:: 1.5.0 @@ -701,6 +758,15 @@ class DataFrameReader(OptionUtils): disables `partition discovery `_. # noqa + modification times occurring before the specified time. The provided timestamp + must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00) + modifiedBefore : an optional timestamp to only include files with + modification times occurring before the specified time. The provided timestamp + must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00) + modifiedAfter : an optional timestamp to only include files with + modification times occurring after the specified time. The provided timestamp + must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00) + Examples -------- >>> df = spark.read.orc('python/test_support/sql/orc_partitioned') @@ -708,6 +774,7 @@ class DataFrameReader(OptionUtils): [('a', 'bigint'), ('b', 'int'), ('c', 'int')] """ self._set_opts(mergeSchema=mergeSchema, pathGlobFilter=pathGlobFilter, + modifiedBefore=modifiedBefore, modifiedAfter=modifiedAfter, recursiveFileLookup=recursiveFileLookup) if isinstance(path, str): path = [path] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 276d5d29bf..b26bc6441b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -493,6 +493,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { *
  • `pathGlobFilter`: an optional glob pattern to only include files with paths matching * the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter. * It does not change the behavior of partition discovery.
  • + *
  • `modifiedBefore` (batch only): an optional timestamp to only include files with + * modification times occurring before the specified Time. The provided timestamp + * must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
  • + *
  • `modifiedAfter` (batch only): an optional timestamp to only include files with + * modification times occurring after the specified Time. The provided timestamp + * must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
  • *
  • `recursiveFileLookup`: recursively scan a directory for files. Using this option * disables partition discovery
  • *
  • `allowNonNumericNumbers` (default `true`): allows JSON parser to recognize set of @@ -750,6 +756,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { *
  • `pathGlobFilter`: an optional glob pattern to only include files with paths matching * the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter. * It does not change the behavior of partition discovery.
  • + *
  • `modifiedBefore` (batch only): an optional timestamp to only include files with + * modification times occurring before the specified Time. The provided timestamp + * must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
  • + *
  • `modifiedAfter` (batch only): an optional timestamp to only include files with + * modification times occurring after the specified Time. The provided timestamp + * must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
  • *
  • `recursiveFileLookup`: recursively scan a directory for files. Using this option * disables partition discovery
  • * @@ -781,6 +793,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { *
  • `pathGlobFilter`: an optional glob pattern to only include files with paths matching * the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter. * It does not change the behavior of partition discovery.
  • + *
  • `modifiedBefore` (batch only): an optional timestamp to only include files with + * modification times occurring before the specified Time. The provided timestamp + * must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
  • + *
  • `modifiedAfter` (batch only): an optional timestamp to only include files with + * modification times occurring after the specified Time. The provided timestamp + * must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
  • *
  • `recursiveFileLookup`: recursively scan a directory for files. Using this option * disables partition discovery
  • * @@ -814,6 +832,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { *
  • `pathGlobFilter`: an optional glob pattern to only include files with paths matching * the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter. * It does not change the behavior of partition discovery.
  • + *
  • `modifiedBefore` (batch only): an optional timestamp to only include files with + * modification times occurring before the specified Time. The provided timestamp + * must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
  • + *
  • `modifiedAfter` (batch only): an optional timestamp to only include files with + * modification times occurring after the specified Time. The provided timestamp + * must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
  • *
  • `recursiveFileLookup`: recursively scan a directory for files. Using this option * disables partition discovery
  • * @@ -880,6 +904,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { *
  • `pathGlobFilter`: an optional glob pattern to only include files with paths matching * the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter. * It does not change the behavior of partition discovery.
  • + *
  • `modifiedBefore` (batch only): an optional timestamp to only include files with + * modification times occurring before the specified Time. The provided timestamp + * must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
  • + *
  • `modifiedAfter` (batch only): an optional timestamp to only include files with + * modification times occurring after the specified Time. The provided timestamp + * must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
  • *
  • `recursiveFileLookup`: recursively scan a directory for files. Using this option * disables partition discovery
  • * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index fed9614347..5b0d0606da 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -57,13 +57,10 @@ abstract class PartitioningAwareFileIndex( protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] private val caseInsensitiveMap = CaseInsensitiveMap(parameters) + private val pathFilters = PathFilterFactory.create(caseInsensitiveMap) - protected lazy val pathGlobFilter: Option[GlobFilter] = - caseInsensitiveMap.get("pathGlobFilter").map(new GlobFilter(_)) - - protected def matchGlobPattern(file: FileStatus): Boolean = { - pathGlobFilter.forall(_.accept(file.getPath)) - } + protected def matchPathPattern(file: FileStatus): Boolean = + pathFilters.forall(_.accept(file)) protected lazy val recursiveFileLookup: Boolean = { caseInsensitiveMap.getOrElse("recursiveFileLookup", "false").toBoolean @@ -86,7 +83,7 @@ abstract class PartitioningAwareFileIndex( val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match { case Some(existingDir) => // Directory has children files in it, return them - existingDir.filter(f => matchGlobPattern(f) && isNonEmptyFile(f)) + existingDir.filter(f => matchPathPattern(f) && isNonEmptyFile(f)) case None => // Directory does not exist, or has no children files @@ -135,7 +132,7 @@ abstract class PartitioningAwareFileIndex( } else { leafFiles.values.toSeq } - files.filter(matchGlobPattern) + files.filter(matchPathPattern) } protected def inferPartitioning(): PartitionSpec = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/pathFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/pathFilters.scala new file mode 100644 index 0000000000..c8f23988f9 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/pathFilters.scala @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.util.{Locale, TimeZone} + +import org.apache.hadoop.fs.{FileStatus, GlobFilter} + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.unsafe.types.UTF8String + +trait PathFilterStrategy extends Serializable { + def accept(fileStatus: FileStatus): Boolean +} + +trait StrategyBuilder { + def create(parameters: CaseInsensitiveMap[String]): Option[PathFilterStrategy] +} + +class PathGlobFilter(filePatten: String) extends PathFilterStrategy { + + private val globFilter = new GlobFilter(filePatten) + + override def accept(fileStatus: FileStatus): Boolean = + globFilter.accept(fileStatus.getPath) +} + +object PathGlobFilter extends StrategyBuilder { + val PARAM_NAME = "pathglobfilter" + + override def create(parameters: CaseInsensitiveMap[String]): Option[PathFilterStrategy] = { + parameters.get(PARAM_NAME).map(new PathGlobFilter(_)) + } +} + +/** + * Provide modifiedAfter and modifiedBefore options when + * filtering from a batch-based file data source. + * + * Example Usages + * Load all CSV files modified after date: + * {{{ + * spark.read.format("csv").option("modifiedAfter","2020-06-15T05:00:00").load() + * }}} + * + * Load all CSV files modified before date: + * {{{ + * spark.read.format("csv").option("modifiedBefore","2020-06-15T05:00:00").load() + * }}} + * + * Load all CSV files modified between two dates: + * {{{ + * spark.read.format("csv").option("modifiedAfter","2019-01-15T05:00:00") + * .option("modifiedBefore","2020-06-15T05:00:00").load() + * }}} + */ +abstract class ModifiedDateFilter extends PathFilterStrategy { + + def timeZoneId: String + + protected def localTime(micros: Long): Long = + DateTimeUtils.fromUTCTime(micros, timeZoneId) +} + +object ModifiedDateFilter { + + def getTimeZoneId(options: CaseInsensitiveMap[String]): String = { + options.getOrElse( + DateTimeUtils.TIMEZONE_OPTION.toLowerCase(Locale.ROOT), + SQLConf.get.sessionLocalTimeZone) + } + + def toThreshold(timeString: String, timeZoneId: String, strategy: String): Long = { + val timeZone: TimeZone = DateTimeUtils.getTimeZone(timeZoneId) + val ts = UTF8String.fromString(timeString) + DateTimeUtils.stringToTimestamp(ts, timeZone.toZoneId).getOrElse { + throw new AnalysisException( + s"The timestamp provided for the '$strategy' option is invalid. The expected format " + + s"is 'YYYY-MM-DDTHH:mm:ss', but the provided timestamp: $timeString") + } + } +} + +/** + * Filter used to determine whether file was modified before the provided timestamp. + */ +class ModifiedBeforeFilter(thresholdTime: Long, val timeZoneId: String) + extends ModifiedDateFilter { + + override def accept(fileStatus: FileStatus): Boolean = + // We standardize on microseconds wherever possible + // getModificationTime returns in milliseconds + thresholdTime - localTime(DateTimeUtils.millisToMicros(fileStatus.getModificationTime)) > 0 +} + +object ModifiedBeforeFilter extends StrategyBuilder { + import ModifiedDateFilter._ + + val PARAM_NAME = "modifiedbefore" + + override def create(parameters: CaseInsensitiveMap[String]): Option[PathFilterStrategy] = { + parameters.get(PARAM_NAME).map { value => + val timeZoneId = getTimeZoneId(parameters) + val thresholdTime = toThreshold(value, timeZoneId, PARAM_NAME) + new ModifiedBeforeFilter(thresholdTime, timeZoneId) + } + } +} + +/** + * Filter used to determine whether file was modified after the provided timestamp. + */ +class ModifiedAfterFilter(thresholdTime: Long, val timeZoneId: String) + extends ModifiedDateFilter { + + override def accept(fileStatus: FileStatus): Boolean = + // getModificationTime returns in milliseconds + // We standardize on microseconds wherever possible + localTime(DateTimeUtils.millisToMicros(fileStatus.getModificationTime)) - thresholdTime > 0 +} + +object ModifiedAfterFilter extends StrategyBuilder { + import ModifiedDateFilter._ + + val PARAM_NAME = "modifiedafter" + + override def create(parameters: CaseInsensitiveMap[String]): Option[PathFilterStrategy] = { + parameters.get(PARAM_NAME).map { value => + val timeZoneId = getTimeZoneId(parameters) + val thresholdTime = toThreshold(value, timeZoneId, PARAM_NAME) + new ModifiedAfterFilter(thresholdTime, timeZoneId) + } + } +} + +object PathFilterFactory { + + private val strategies = + Seq(PathGlobFilter, ModifiedBeforeFilter, ModifiedAfterFilter) + + def create(parameters: CaseInsensitiveMap[String]): Seq[PathFilterStrategy] = { + strategies.flatMap { _.create(parameters) } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala index 712ed1585b..6f43542fd6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala @@ -23,6 +23,7 @@ import scala.util.Try import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.execution.datasources.{ModifiedAfterFilter, ModifiedBeforeFilter} import org.apache.spark.util.Utils /** @@ -32,6 +33,16 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters)) + checkDisallowedOptions(parameters) + + private def checkDisallowedOptions(options: Map[String, String]): Unit = { + Seq(ModifiedBeforeFilter.PARAM_NAME, ModifiedAfterFilter.PARAM_NAME).foreach { param => + if (parameters.contains(param)) { + throw new IllegalArgumentException(s"option '$param' is not allowed in file stream sources") + } + } + } + val maxFilesPerTrigger: Option[Int] = parameters.get("maxFilesPerTrigger").map { str => Try(str.toInt).toOption.filter(_ > 0).getOrElse { throw new IllegalArgumentException( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index b27c114518..876f62803d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -577,38 +577,6 @@ class FileBasedDataSourceSuite extends QueryTest } } - test("Option pathGlobFilter: filter files correctly") { - withTempPath { path => - val dataDir = path.getCanonicalPath - Seq("foo").toDS().write.text(dataDir) - Seq("bar").toDS().write.mode("append").orc(dataDir) - val df = spark.read.option("pathGlobFilter", "*.txt").text(dataDir) - checkAnswer(df, Row("foo")) - - // Both glob pattern in option and path should be effective to filter files. - val df2 = spark.read.option("pathGlobFilter", "*.txt").text(dataDir + "/*.orc") - checkAnswer(df2, Seq.empty) - - val df3 = spark.read.option("pathGlobFilter", "*.txt").text(dataDir + "/*xt") - checkAnswer(df3, Row("foo")) - } - } - - test("Option pathGlobFilter: simple extension filtering should contains partition info") { - withTempPath { path => - val input = Seq(("foo", 1), ("oof", 2)).toDF("a", "b") - input.write.partitionBy("b").text(path.getCanonicalPath) - Seq("bar").toDS().write.mode("append").orc(path.getCanonicalPath + "/b=1") - - // If we use glob pattern in the path, the partition column won't be shown in the result. - val df = spark.read.text(path.getCanonicalPath + "/*/*.txt") - checkAnswer(df, input.select("a")) - - val df2 = spark.read.option("pathGlobFilter", "*.txt").text(path.getCanonicalPath) - checkAnswer(df2, input) - } - } - test("Option recursiveFileLookup: recursive loading correctly") { val expectedFileList = mutable.ListBuffer[String]() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PathFilterStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PathFilterStrategySuite.scala new file mode 100644 index 0000000000..b965a78c9e --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PathFilterStrategySuite.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.test.SharedSparkSession + +class PathFilterStrategySuite extends QueryTest with SharedSparkSession { + + test("SPARK-31962: PathFilterStrategies - modifiedAfter option") { + val options = + CaseInsensitiveMap[String](Map("modifiedAfter" -> "2010-10-01T01:01:00")) + val strategy = PathFilterFactory.create(options) + assert(strategy.head.isInstanceOf[ModifiedAfterFilter]) + assert(strategy.size == 1) + } + + test("SPARK-31962: PathFilterStrategies - modifiedBefore option") { + val options = + CaseInsensitiveMap[String](Map("modifiedBefore" -> "2020-10-01T01:01:00")) + val strategy = PathFilterFactory.create(options) + assert(strategy.head.isInstanceOf[ModifiedBeforeFilter]) + assert(strategy.size == 1) + } + + test("SPARK-31962: PathFilterStrategies - pathGlobFilter option") { + val options = CaseInsensitiveMap[String](Map("pathGlobFilter" -> "*.txt")) + val strategy = PathFilterFactory.create(options) + assert(strategy.head.isInstanceOf[PathGlobFilter]) + assert(strategy.size == 1) + } + + test("SPARK-31962: PathFilterStrategies - no options") { + val options = CaseInsensitiveMap[String](Map.empty) + val strategy = PathFilterFactory.create(options) + assert(strategy.isEmpty) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PathFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PathFilterSuite.scala new file mode 100644 index 0000000000..1af2adfd86 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PathFilterSuite.scala @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.io.File +import java.time.{LocalDateTime, ZoneId, ZoneOffset} +import java.time.format.DateTimeFormatter + +import scala.util.Random + +import org.apache.spark.sql.{AnalysisException, QueryTest, Row} +import org.apache.spark.sql.catalyst.util.{stringToFile, DateTimeUtils} +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.{StringType, StructField, StructType} + +class PathFilterSuite extends QueryTest with SharedSparkSession { + import testImplicits._ + + test("SPARK-31962: modifiedBefore specified" + + " and sharing same timestamp with file last modified time.") { + withTempDir { dir => + val curTime = LocalDateTime.now(ZoneOffset.UTC) + executeTest(dir, Seq(curTime), 0, modifiedBefore = Some(formatTime(curTime))) + } + } + + test("SPARK-31962: modifiedAfter specified" + + " and sharing same timestamp with file last modified time.") { + withTempDir { dir => + val curTime = LocalDateTime.now(ZoneOffset.UTC) + executeTest(dir, Seq(curTime), 0, modifiedAfter = Some(formatTime(curTime))) + } + } + + test("SPARK-31962: modifiedBefore and modifiedAfter option" + + " share same timestamp with file last modified time.") { + withTempDir { dir => + val curTime = LocalDateTime.now(ZoneOffset.UTC) + val formattedTime = formatTime(curTime) + executeTest(dir, Seq(curTime), 0, modifiedBefore = Some(formattedTime), + modifiedAfter = Some(formattedTime)) + } + } + + test("SPARK-31962: modifiedBefore and modifiedAfter option" + + " share same timestamp with earlier file last modified time.") { + withTempDir { dir => + val curTime = LocalDateTime.now(ZoneOffset.UTC) + val fileTime = curTime.minusDays(3) + val formattedTime = formatTime(curTime) + executeTest(dir, Seq(fileTime), 0, modifiedBefore = Some(formattedTime), + modifiedAfter = Some(formattedTime)) + } + } + + test("SPARK-31962: modifiedBefore and modifiedAfter option" + + " share same timestamp with later file last modified time.") { + withTempDir { dir => + val curTime = LocalDateTime.now(ZoneOffset.UTC) + val formattedTime = formatTime(curTime) + executeTest(dir, Seq(curTime), 0, modifiedBefore = Some(formattedTime), + modifiedAfter = Some(formattedTime)) + } + } + + test("SPARK-31962: when modifiedAfter specified with a past date") { + withTempDir { dir => + val curTime = LocalDateTime.now(ZoneOffset.UTC) + val pastTime = curTime.minusYears(1) + val formattedTime = formatTime(pastTime) + executeTest(dir, Seq(curTime), 1, modifiedAfter = Some(formattedTime)) + } + } + + test("SPARK-31962: when modifiedBefore specified with a future date") { + withTempDir { dir => + val curTime = LocalDateTime.now(ZoneOffset.UTC) + val futureTime = curTime.plusYears(1) + val formattedTime = formatTime(futureTime) + executeTest(dir, Seq(curTime), 1, modifiedBefore = Some(formattedTime)) + } + } + + test("SPARK-31962: with modifiedBefore option provided using a past date") { + withTempDir { dir => + val curTime = LocalDateTime.now(ZoneOffset.UTC) + val pastTime = curTime.minusYears(1) + val formattedTime = formatTime(pastTime) + executeTest(dir, Seq(curTime), 0, modifiedBefore = Some(formattedTime)) + } + } + + test("SPARK-31962: modifiedAfter specified with a past date, multiple files, one valid") { + withTempDir { dir => + val fileTime1 = LocalDateTime.now(ZoneOffset.UTC) + val fileTime2 = LocalDateTime.ofEpochSecond(0, 0, ZoneOffset.UTC) + val pastTime = fileTime1.minusYears(1) + val formattedTime = formatTime(pastTime) + executeTest(dir, Seq(fileTime1, fileTime2), 1, modifiedAfter = Some(formattedTime)) + } + } + + test("SPARK-31962: modifiedAfter specified with a past date, multiple files, both valid") { + withTempDir { dir => + val curTime = LocalDateTime.now(ZoneOffset.UTC) + val pastTime = curTime.minusYears(1) + val formattedTime = formatTime(pastTime) + executeTest(dir, Seq(curTime, curTime), 2, modifiedAfter = Some(formattedTime)) + } + } + + test("SPARK-31962: modifiedAfter specified with a past date, multiple files, none valid") { + withTempDir { dir => + val fileTime = LocalDateTime.ofEpochSecond(0, 0, ZoneOffset.UTC) + val pastTime = LocalDateTime.now(ZoneOffset.UTC).minusYears(1) + val formattedTime = formatTime(pastTime) + executeTest(dir, Seq(fileTime, fileTime), 0, modifiedAfter = Some(formattedTime)) + } + } + + test("SPARK-31962: modifiedBefore specified with a future date, multiple files, both valid") { + withTempDir { dir => + val fileTime = LocalDateTime.ofEpochSecond(0, 0, ZoneOffset.UTC) + val futureTime = LocalDateTime.now(ZoneOffset.UTC).plusYears(1) + val formattedTime = formatTime(futureTime) + executeTest(dir, Seq(fileTime, fileTime), 2, modifiedBefore = Some(formattedTime)) + } + } + + test("SPARK-31962: modifiedBefore specified with a future date, multiple files, one valid") { + withTempDir { dir => + val curTime = LocalDateTime.now(ZoneOffset.UTC) + val fileTime1 = LocalDateTime.ofEpochSecond(0, 0, ZoneOffset.UTC) + val fileTime2 = curTime.plusDays(3) + val formattedTime = formatTime(curTime) + executeTest(dir, Seq(fileTime1, fileTime2), 1, modifiedBefore = Some(formattedTime)) + } + } + + test("SPARK-31962: modifiedBefore specified with a future date, multiple files, none valid") { + withTempDir { dir => + val fileTime = LocalDateTime.now(ZoneOffset.UTC).minusDays(1) + val formattedTime = formatTime(fileTime) + executeTest(dir, Seq(fileTime, fileTime), 0, modifiedBefore = Some(formattedTime)) + } + } + + test("SPARK-31962: modifiedBefore/modifiedAfter is specified with an invalid date") { + executeTestWithBadOption( + Map("modifiedBefore" -> "2024-05+1 01:00:00"), + Seq("The timestamp provided", "modifiedbefore", "2024-05+1 01:00:00")) + + executeTestWithBadOption( + Map("modifiedAfter" -> "2024-05+1 01:00:00"), + Seq("The timestamp provided", "modifiedafter", "2024-05+1 01:00:00")) + } + + test("SPARK-31962: modifiedBefore/modifiedAfter - empty option") { + executeTestWithBadOption( + Map("modifiedBefore" -> ""), + Seq("The timestamp provided", "modifiedbefore")) + + executeTestWithBadOption( + Map("modifiedAfter" -> ""), + Seq("The timestamp provided", "modifiedafter")) + } + + test("SPARK-31962: modifiedBefore/modifiedAfter filter takes into account local timezone " + + "when specified as an option.") { + Seq("modifiedbefore", "modifiedafter").foreach { filterName => + // CET = UTC + 1 hour, HST = UTC - 10 hours + Seq("CET", "HST").foreach { tzId => + testModifiedDateFilterWithTimezone(tzId, filterName) + } + } + } + + test("Option pathGlobFilter: filter files correctly") { + withTempPath { path => + val dataDir = path.getCanonicalPath + Seq("foo").toDS().write.text(dataDir) + Seq("bar").toDS().write.mode("append").orc(dataDir) + val df = spark.read.option("pathGlobFilter", "*.txt").text(dataDir) + checkAnswer(df, Row("foo")) + + // Both glob pattern in option and path should be effective to filter files. + val df2 = spark.read.option("pathGlobFilter", "*.txt").text(dataDir + "/*.orc") + checkAnswer(df2, Seq.empty) + + val df3 = spark.read.option("pathGlobFilter", "*.txt").text(dataDir + "/*xt") + checkAnswer(df3, Row("foo")) + } + } + + test("Option pathGlobFilter: simple extension filtering should contains partition info") { + withTempPath { path => + val input = Seq(("foo", 1), ("oof", 2)).toDF("a", "b") + input.write.partitionBy("b").text(path.getCanonicalPath) + Seq("bar").toDS().write.mode("append").orc(path.getCanonicalPath + "/b=1") + + // If we use glob pattern in the path, the partition column won't be shown in the result. + val df = spark.read.text(path.getCanonicalPath + "/*/*.txt") + checkAnswer(df, input.select("a")) + + val df2 = spark.read.option("pathGlobFilter", "*.txt").text(path.getCanonicalPath) + checkAnswer(df2, input) + } + } + + private def executeTest( + dir: File, + fileDates: Seq[LocalDateTime], + expectedCount: Long, + modifiedBefore: Option[String] = None, + modifiedAfter: Option[String] = None): Unit = { + fileDates.foreach { fileDate => + val file = createSingleFile(dir) + setFileTime(fileDate, file) + } + + val schema = StructType(Seq(StructField("a", StringType))) + + var dfReader = spark.read.format("csv").option("timeZone", "UTC").schema(schema) + modifiedBefore.foreach { opt => dfReader = dfReader.option("modifiedBefore", opt) } + modifiedAfter.foreach { opt => dfReader = dfReader.option("modifiedAfter", opt) } + + if (expectedCount > 0) { + // without pathGlobFilter + val df1 = dfReader.load(dir.getCanonicalPath) + assert(df1.count() === expectedCount) + + // pathGlobFilter matched + val df2 = dfReader.option("pathGlobFilter", "*.csv").load(dir.getCanonicalPath) + assert(df2.count() === expectedCount) + + // pathGlobFilter mismatched + val df3 = dfReader.option("pathGlobFilter", "*.txt").load(dir.getCanonicalPath) + assert(df3.count() === 0) + } else { + val df = dfReader.load(dir.getCanonicalPath) + assert(df.count() === 0) + } + } + + private def executeTestWithBadOption( + options: Map[String, String], + expectedMsgParts: Seq[String]): Unit = { + withTempDir { dir => + createSingleFile(dir) + val exc = intercept[AnalysisException] { + var dfReader = spark.read.format("csv") + options.foreach { case (key, value) => + dfReader = dfReader.option(key, value) + } + dfReader.load(dir.getCanonicalPath) + } + expectedMsgParts.foreach { msg => assert(exc.getMessage.contains(msg)) } + } + } + + private def testModifiedDateFilterWithTimezone( + timezoneId: String, + filterParamName: String): Unit = { + val curTime = LocalDateTime.now(ZoneOffset.UTC) + val zoneId: ZoneId = DateTimeUtils.getTimeZone(timezoneId).toZoneId + val strategyTimeInMicros = + ModifiedDateFilter.toThreshold( + curTime.toString, + timezoneId, + filterParamName) + val strategyTimeInSeconds = strategyTimeInMicros / 1000 / 1000 + + val curTimeAsSeconds = curTime.atZone(zoneId).toEpochSecond + withClue(s"timezone: $timezoneId / param: $filterParamName,") { + assert(strategyTimeInSeconds === curTimeAsSeconds) + } + } + + private def createSingleFile(dir: File): File = { + val file = new File(dir, "temp" + Random.nextInt(1000000) + ".csv") + stringToFile(file, "text") + } + + private def setFileTime(time: LocalDateTime, file: File): Boolean = { + val sameTime = time.toEpochSecond(ZoneOffset.UTC) + file.setLastModified(sameTime * 1000) + } + + private def formatTime(time: LocalDateTime): String = { + time.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss")) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index cf9664a976..718095003b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.streaming import java.io.File import java.net.URI +import java.time.{LocalDateTime, ZoneOffset} +import java.time.format.DateTimeFormatter import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable @@ -40,7 +42,6 @@ import org.apache.spark.sql.execution.streaming.sources.MemorySink import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.util.StreamManualClock import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.types._ import org.apache.spark.sql.types.{StructType, _} import org.apache.spark.util.Utils @@ -2054,6 +2055,47 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } + test("SPARK-31962: file stream source shouldn't allow modifiedBefore/modifiedAfter") { + def formatTime(time: LocalDateTime): String = { + time.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss")) + } + + def assertOptionIsNotSupported(options: Map[String, String], path: String): Unit = { + val schema = StructType(Seq(StructField("a", StringType))) + var dsReader = spark.readStream + .format("csv") + .option("timeZone", "UTC") + .schema(schema) + + options.foreach { case (k, v) => dsReader = dsReader.option(k, v) } + + val df = dsReader.load(path) + + testStream(df)( + ExpectFailure[IllegalArgumentException]( + t => assert(t.getMessage.contains("is not allowed in file stream source")), + isFatalError = false) + ) + } + + withTempDir { dir => + // "modifiedBefore" + val futureTime = LocalDateTime.now(ZoneOffset.UTC).plusYears(1) + val formattedFutureTime = formatTime(futureTime) + assertOptionIsNotSupported(Map("modifiedBefore" -> formattedFutureTime), dir.getCanonicalPath) + + // "modifiedAfter" + val prevTime = LocalDateTime.now(ZoneOffset.UTC).minusYears(1) + val formattedPrevTime = formatTime(prevTime) + assertOptionIsNotSupported(Map("modifiedAfter" -> formattedPrevTime), dir.getCanonicalPath) + + // both + assertOptionIsNotSupported( + Map("modifiedBefore" -> formattedFutureTime, "modifiedAfter" -> formattedPrevTime), + dir.getCanonicalPath) + } + } + private def createFile(content: String, src: File, tmp: File): File = { val tempFile = Utils.tempFileWith(new File(tmp, "text")) val finalFile = new File(src, tempFile.getName)