[SPARK-27490][SQL] File source V2: return correct result for Dataset.inputFiles()

## What changes were proposed in this pull request?

Currently, a `Dateset` with file source V2 always return empty results for method `Dataset.inputFiles()`.

We should fix it.

## How was this patch tested?

Unit test

Closes #24393 from gengliangwang/inputFiles.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Gengliang Wang 2019-04-18 14:39:30 +08:00 committed by Wenchen Fan
parent 50bdc9befa
commit 7d44ba05d1
2 changed files with 20 additions and 13 deletions

View file

@ -50,6 +50,7 @@ import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.arrow.{ArrowBatchStreamWriter, ArrowConverters}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, FileTable}
import org.apache.spark.sql.execution.python.EvaluatePython
import org.apache.spark.sql.execution.stat.StatFunctions
import org.apache.spark.sql.streaming.DataStreamWriter
@ -3175,6 +3176,8 @@ class Dataset[T] private[sql](
fr.inputFiles
case r: HiveTableRelation =>
r.tableMeta.storage.locationUri.map(_.toString).toArray
case DataSourceV2Relation(table: FileTable, _, _) =>
table.fileIndex.inputFiles
}.flatten
files.toSet.toArray
}

View file

@ -762,22 +762,26 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
}
test("inputFiles") {
withTempDir { dir =>
val df = Seq((1, 22)).toDF("a", "b")
Seq("csv", "").foreach { useV1List =>
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> useV1List) {
withTempDir { dir =>
val df = Seq((1, 22)).toDF("a", "b")
val parquetDir = new File(dir, "parquet").getCanonicalPath
df.write.parquet(parquetDir)
val parquetDF = spark.read.parquet(parquetDir)
assert(parquetDF.inputFiles.nonEmpty)
val parquetDir = new File(dir, "parquet").getCanonicalPath
df.write.parquet(parquetDir)
val parquetDF = spark.read.parquet(parquetDir)
assert(parquetDF.inputFiles.nonEmpty)
val jsonDir = new File(dir, "json").getCanonicalPath
df.write.json(jsonDir)
val jsonDF = spark.read.json(jsonDir)
assert(parquetDF.inputFiles.nonEmpty)
val csvDir = new File(dir, "csv").getCanonicalPath
df.write.json(csvDir)
val csvDF = spark.read.json(csvDir)
assert(csvDF.inputFiles.nonEmpty)
val unioned = jsonDF.union(parquetDF).inputFiles.sorted
val allFiles = (jsonDF.inputFiles ++ parquetDF.inputFiles).distinct.sorted
assert(unioned === allFiles)
val unioned = csvDF.union(parquetDF).inputFiles.sorted
val allFiles = (csvDF.inputFiles ++ parquetDF.inputFiles).distinct.sorted
assert(unioned === allFiles)
}
}
}
}