[SPARK-28089][SQL] File source v2: support reading output of file streaming Sink

## What changes were proposed in this pull request?

File source V1 supports reading output of FileStreamSink as batch. https://github.com/apache/spark/pull/11897
We should support this in file source V2 as well. When reading with paths, we first check if there is metadata log of FileStreamSink. If yes, we use `MetadataLogFileIndex` for listing files; Otherwise, we use `InMemoryFileIndex`.

## How was this patch tested?

Unit test

Closes #24900 from gengliangwang/FileStreamV2.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Gengliang Wang 2019-06-20 12:57:13 +08:00 committed by Wenchen Fan
parent b276788d57
commit f5107614d6
5 changed files with 223 additions and 161 deletions

View file

@ -20,11 +20,12 @@ import java.util
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import org.apache.hadoop.fs.FileStatus import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalog.v2.expressions.Transform import org.apache.spark.sql.catalog.v2.expressions.Transform
import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.streaming.{FileStreamSink, MetadataLogFileIndex}
import org.apache.spark.sql.sources.v2.{SupportsRead, SupportsWrite, Table, TableCapability} import org.apache.spark.sql.sources.v2.{SupportsRead, SupportsWrite, Table, TableCapability}
import org.apache.spark.sql.sources.v2.TableCapability._ import org.apache.spark.sql.sources.v2.TableCapability._
import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.sql.types.{DataType, StructType}
@ -44,23 +45,37 @@ abstract class FileTable(
val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
// Hadoop Configurations are case sensitive. // Hadoop Configurations are case sensitive.
val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap) val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(paths, hadoopConf, if (FileStreamSink.hasMetadata(paths, hadoopConf, sparkSession.sessionState.conf)) {
checkEmptyGlobPath = true, checkFilesExist = true) // We are reading from the results of a streaming query. We will load files from
val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) // the metadata log instead of listing them using HDFS APIs.
new InMemoryFileIndex( new MetadataLogFileIndex(sparkSession, new Path(paths.head),
sparkSession, rootPathsSpecified, caseSensitiveMap, userSpecifiedSchema, fileStatusCache) options.asScala.toMap, userSpecifiedSchema)
} else {
// This is a non-streaming file based datasource.
val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(paths, hadoopConf,
checkEmptyGlobPath = true, checkFilesExist = true)
val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
new InMemoryFileIndex(
sparkSession, rootPathsSpecified, caseSensitiveMap, userSpecifiedSchema, fileStatusCache)
}
} }
lazy val dataSchema: StructType = userSpecifiedSchema.map { schema => lazy val dataSchema: StructType = {
val partitionSchema = fileIndex.partitionSchema val schema = userSpecifiedSchema.map { schema =>
val resolver = sparkSession.sessionState.conf.resolver val partitionSchema = fileIndex.partitionSchema
StructType(schema.filterNot(f => partitionSchema.exists(p => resolver(p.name, f.name)))) val resolver = sparkSession.sessionState.conf.resolver
}.orElse { StructType(schema.filterNot(f => partitionSchema.exists(p => resolver(p.name, f.name))))
inferSchema(fileIndex.allFiles()) }.orElse {
}.getOrElse { inferSchema(fileIndex.allFiles())
throw new AnalysisException( }.getOrElse {
s"Unable to infer schema for $formatName. It must be specified manually.") throw new AnalysisException(
}.asNullable s"Unable to infer schema for $formatName. It must be specified manually.")
}
fileIndex match {
case _: MetadataLogFileIndex => schema
case _ => schema.asNullable
}
}
override lazy val schema: StructType = { override lazy val schema: StructType = {
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis

View file

@ -25,17 +25,19 @@ import scala.collection.JavaConverters._
import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.Path
import org.apache.spark.SparkConf
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql.{AnalysisException, DataFrame} import org.apache.spark.sql.{AnalysisException, DataFrame}
import org.apache.spark.sql.execution.DataSourceScanExec import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV2Relation, FileScan, FileTable}
import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.functions._ import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{IntegerType, StructField, StructType} import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.util.Utils import org.apache.spark.util.Utils
class FileStreamSinkSuite extends StreamTest { abstract class FileStreamSinkSuite extends StreamTest {
import testImplicits._ import testImplicits._
override def beforeAll(): Unit = { override def beforeAll(): Unit = {
@ -51,6 +53,8 @@ class FileStreamSinkSuite extends StreamTest {
} }
} }
protected def checkQueryExecution(df: DataFrame): Unit
test("unpartitioned writing and batch reading") { test("unpartitioned writing and batch reading") {
val inputData = MemoryStream[Int] val inputData = MemoryStream[Int]
val df = inputData.toDF() val df = inputData.toDF()
@ -121,78 +125,36 @@ class FileStreamSinkSuite extends StreamTest {
var query: StreamingQuery = null var query: StreamingQuery = null
// TODO: test file source V2 as well. try {
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "parquet") { query =
try { ds.map(i => (i, i * 1000))
query = .toDF("id", "value")
ds.map(i => (i, i * 1000)) .writeStream
.toDF("id", "value") .partitionBy("id")
.writeStream .option("checkpointLocation", checkpointDir)
.partitionBy("id") .format("parquet")
.option("checkpointLocation", checkpointDir) .start(outputDir)
.format("parquet")
.start(outputDir)
inputData.addData(1, 2, 3) inputData.addData(1, 2, 3)
failAfter(streamingTimeout) { failAfter(streamingTimeout) {
query.processAllAvailable() query.processAllAvailable()
} }
val outputDf = spark.read.parquet(outputDir) val outputDf = spark.read.parquet(outputDir)
val expectedSchema = new StructType() val expectedSchema = new StructType()
.add(StructField("value", IntegerType, nullable = false)) .add(StructField("value", IntegerType, nullable = false))
.add(StructField("id", IntegerType)) .add(StructField("id", IntegerType))
assert(outputDf.schema === expectedSchema) assert(outputDf.schema === expectedSchema)
// Verify that MetadataLogFileIndex is being used and the correct partitioning schema has // Verify the data is correctly read
// been inferred checkDatasetUnorderly(
val hadoopdFsRelations = outputDf.queryExecution.analyzed.collect { outputDf.as[(Int, Int)],
case LogicalRelation(baseRelation: HadoopFsRelation, _, _, _) => baseRelation (1000, 1), (2000, 2), (3000, 3))
}
assert(hadoopdFsRelations.size === 1)
assert(hadoopdFsRelations.head.location.isInstanceOf[MetadataLogFileIndex])
assert(hadoopdFsRelations.head.partitionSchema.exists(_.name == "id"))
assert(hadoopdFsRelations.head.dataSchema.exists(_.name == "value"))
// Verify the data is correctly read checkQueryExecution(outputDf)
checkDatasetUnorderly( } finally {
outputDf.as[(Int, Int)], if (query != null) {
(1000, 1), (2000, 2), (3000, 3)) query.stop()
/** Check some condition on the partitions of the FileScanRDD generated by a DF */
def checkFileScanPartitions(df: DataFrame)(func: Seq[FilePartition] => Unit): Unit = {
val getFileScanRDD = df.queryExecution.executedPlan.collect {
case scan: DataSourceScanExec if scan.inputRDDs().head.isInstanceOf[FileScanRDD] =>
scan.inputRDDs().head.asInstanceOf[FileScanRDD]
}.headOption.getOrElse {
fail(s"No FileScan in query\n${df.queryExecution}")
}
func(getFileScanRDD.filePartitions)
}
// Read without pruning
checkFileScanPartitions(outputDf) { partitions =>
// There should be as many distinct partition values as there are distinct ids
assert(partitions.flatMap(_.files.map(_.partitionValues)).distinct.size === 3)
}
// Read with pruning, should read only files in partition dir id=1
checkFileScanPartitions(outputDf.filter("id = 1")) { partitions =>
val filesToBeRead = partitions.flatMap(_.files)
assert(filesToBeRead.map(_.filePath).forall(_.contains("/id=1/")))
assert(filesToBeRead.map(_.partitionValues).distinct.size === 1)
}
// Read with pruning, should read only files in partition dir id=1 and id=2
checkFileScanPartitions(outputDf.filter("id in (1,2)")) { partitions =>
val filesToBeRead = partitions.flatMap(_.files)
assert(!filesToBeRead.map(_.filePath).exists(_.contains("/id=3/")))
assert(filesToBeRead.map(_.partitionValues).distinct.size === 2)
}
} finally {
if (query != null) {
query.stop()
}
} }
} }
} }
@ -512,3 +474,92 @@ class FileStreamSinkSuite extends StreamTest {
} }
} }
} }
class FileStreamSinkV1Suite extends FileStreamSinkSuite {
override protected def sparkConf: SparkConf =
super
.sparkConf
.set(SQLConf.USE_V1_SOURCE_READER_LIST, "csv,json,orc,text,parquet")
.set(SQLConf.USE_V1_SOURCE_WRITER_LIST, "csv,json,orc,text,parquet")
override def checkQueryExecution(df: DataFrame): Unit = {
// Verify that MetadataLogFileIndex is being used and the correct partitioning schema has
// been inferred
val hadoopdFsRelations = df.queryExecution.analyzed.collect {
case LogicalRelation(baseRelation: HadoopFsRelation, _, _, _) => baseRelation
}
assert(hadoopdFsRelations.size === 1)
assert(hadoopdFsRelations.head.location.isInstanceOf[MetadataLogFileIndex])
assert(hadoopdFsRelations.head.partitionSchema.exists(_.name == "id"))
assert(hadoopdFsRelations.head.dataSchema.exists(_.name == "value"))
/** Check some condition on the partitions of the FileScanRDD generated by a DF */
def checkFileScanPartitions(df: DataFrame)(func: Seq[FilePartition] => Unit): Unit = {
val getFileScanRDD = df.queryExecution.executedPlan.collect {
case scan: DataSourceScanExec if scan.inputRDDs().head.isInstanceOf[FileScanRDD] =>
scan.inputRDDs().head.asInstanceOf[FileScanRDD]
}.headOption.getOrElse {
fail(s"No FileScan in query\n${df.queryExecution}")
}
func(getFileScanRDD.filePartitions)
}
// Read without pruning
checkFileScanPartitions(df) { partitions =>
// There should be as many distinct partition values as there are distinct ids
assert(partitions.flatMap(_.files.map(_.partitionValues)).distinct.size === 3)
}
// Read with pruning, should read only files in partition dir id=1
checkFileScanPartitions(df.filter("id = 1")) { partitions =>
val filesToBeRead = partitions.flatMap(_.files)
assert(filesToBeRead.map(_.filePath).forall(_.contains("/id=1/")))
assert(filesToBeRead.map(_.partitionValues).distinct.size === 1)
}
// Read with pruning, should read only files in partition dir id=1 and id=2
checkFileScanPartitions(df.filter("id in (1,2)")) { partitions =>
val filesToBeRead = partitions.flatMap(_.files)
assert(!filesToBeRead.map(_.filePath).exists(_.contains("/id=3/")))
assert(filesToBeRead.map(_.partitionValues).distinct.size === 2)
}
}
}
class FileStreamSinkV2Suite extends FileStreamSinkSuite {
override protected def sparkConf: SparkConf =
super
.sparkConf
.set(SQLConf.USE_V1_SOURCE_READER_LIST, "")
.set(SQLConf.USE_V1_SOURCE_WRITER_LIST, "")
override def checkQueryExecution(df: DataFrame): Unit = {
// Verify that MetadataLogFileIndex is being used and the correct partitioning schema has
// been inferred
val table = df.queryExecution.analyzed.collect {
case DataSourceV2Relation(table: FileTable, _, _) => table
}
assert(table.size === 1)
assert(table.head.fileIndex.isInstanceOf[MetadataLogFileIndex])
assert(table.head.fileIndex.partitionSchema.exists(_.name == "id"))
assert(table.head.dataSchema.exists(_.name == "value"))
/** Check some condition on the partitions of the FileScanRDD generated by a DF */
def checkFileScanPartitions(df: DataFrame)(func: Seq[FilePartition] => Unit): Unit = {
val fileScan = df.queryExecution.executedPlan.collect {
case batch: BatchScanExec if batch.scan.isInstanceOf[FileScan] =>
batch.scan.asInstanceOf[FileScan]
}.headOption.getOrElse {
fail(s"No FileScan in query\n${df.queryExecution}")
}
func(fileScan.planInputPartitions().map(_.asInstanceOf[FilePartition]))
}
// Read without pruning
checkFileScanPartitions(df) { partitions =>
// There should be as many distinct partition values as there are distinct ids
assert(partitions.flatMap(_.files.map(_.partitionValues)).distinct.size === 3)
}
// TODO: test partition pruning when file source V2 supports it.
}
}

View file

@ -218,11 +218,12 @@ class StreamSuite extends StreamTest {
} }
} }
// TODO: fix file source V2 as well. val df = spark.readStream.format(classOf[FakeDefaultSource].getName).load()
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "parquet") { Seq("", "parquet").foreach { useV1SourceReader =>
val df = spark.readStream.format(classOf[FakeDefaultSource].getName).load() withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> useV1SourceReader) {
assertDF(df) assertDF(df)
assertDF(df) assertDF(df)
}
} }
} }

View file

@ -197,32 +197,30 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest {
} }
test("deduplicate with file sink") { test("deduplicate with file sink") {
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "parquet") { withTempDir { output =>
withTempDir { output => withTempDir { checkpointDir =>
withTempDir { checkpointDir => val outputPath = output.getAbsolutePath
val outputPath = output.getAbsolutePath val inputData = MemoryStream[String]
val inputData = MemoryStream[String] val result = inputData.toDS().dropDuplicates()
val result = inputData.toDS().dropDuplicates() val q = result.writeStream
val q = result.writeStream .format("parquet")
.format("parquet") .outputMode(Append)
.outputMode(Append) .option("checkpointLocation", checkpointDir.getPath)
.option("checkpointLocation", checkpointDir.getPath) .start(outputPath)
.start(outputPath) try {
try { inputData.addData("a")
inputData.addData("a") q.processAllAvailable()
q.processAllAvailable() checkDataset(spark.read.parquet(outputPath).as[String], "a")
checkDataset(spark.read.parquet(outputPath).as[String], "a")
inputData.addData("a") // Dropped inputData.addData("a") // Dropped
q.processAllAvailable() q.processAllAvailable()
checkDataset(spark.read.parquet(outputPath).as[String], "a") checkDataset(spark.read.parquet(outputPath).as[String], "a")
inputData.addData("b") inputData.addData("b")
q.processAllAvailable() q.processAllAvailable()
checkDataset(spark.read.parquet(outputPath).as[String], "a", "b") checkDataset(spark.read.parquet(outputPath).as[String], "a", "b")
} finally { } finally {
q.stop() q.stop()
}
} }
} }
} }

View file

@ -995,59 +995,56 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
// Reading a file sink output in a batch query should detect the legacy _spark_metadata // Reading a file sink output in a batch query should detect the legacy _spark_metadata
// directory and throw an error // directory and throw an error
// TODO: test file source V2 as well. val e = intercept[SparkException] {
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "parquet") { spark.read.load(outputDir.getCanonicalPath).as[Int]
val e = intercept[SparkException] { }
spark.read.load(outputDir.getCanonicalPath).as[Int] assertMigrationError(e.getMessage, sparkMetadataDir, legacySparkMetadataDir)
}
assertMigrationError(e.getMessage, sparkMetadataDir, legacySparkMetadataDir)
// Restarting the streaming query should detect the legacy _spark_metadata directory and // Restarting the streaming query should detect the legacy _spark_metadata directory and
// throw an error // throw an error
val inputData = MemoryStream[Int] val inputData = MemoryStream[Int]
val e2 = intercept[SparkException] { val e2 = intercept[SparkException] {
inputData.toDF() inputData.toDF()
.writeStream
.format("parquet")
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.start(outputDir.getCanonicalPath)
}
assertMigrationError(e2.getMessage, sparkMetadataDir, legacySparkMetadataDir)
// Move "_spark_metadata" to fix the file sink and test the checkpoint path.
FileUtils.moveDirectory(legacySparkMetadataDir, sparkMetadataDir)
// Restarting the streaming query should detect the legacy
// checkpoint path and throw an error.
val e3 = intercept[SparkException] {
inputData.toDF()
.writeStream
.format("parquet")
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.start(outputDir.getCanonicalPath)
}
assertMigrationError(e3.getMessage, checkpointDir, legacyCheckpointDir)
// Fix the checkpoint path and verify that the user can migrate the issue by moving files.
FileUtils.moveDirectory(legacyCheckpointDir, checkpointDir)
val q = inputData.toDF()
.writeStream .writeStream
.format("parquet") .format("parquet")
.option("checkpointLocation", checkpointDir.getCanonicalPath) .option("checkpointLocation", checkpointDir.getCanonicalPath)
.start(outputDir.getCanonicalPath) .start(outputDir.getCanonicalPath)
try { }
q.processAllAvailable() assertMigrationError(e2.getMessage, sparkMetadataDir, legacySparkMetadataDir)
// Check the query id to make sure it did use checkpoint
assert(q.id.toString == "09be7fb3-49d8-48a6-840d-e9c2ad92a898")
// Verify that the batch query can read "_spark_metadata" correctly after migration. // Move "_spark_metadata" to fix the file sink and test the checkpoint path.
val df = spark.read.load(outputDir.getCanonicalPath) FileUtils.moveDirectory(legacySparkMetadataDir, sparkMetadataDir)
assert(df.queryExecution.executedPlan.toString contains "MetadataLogFileIndex")
checkDatasetUnorderly(df.as[Int], 1, 2, 3) // Restarting the streaming query should detect the legacy
} finally { // checkpoint path and throw an error.
q.stop() val e3 = intercept[SparkException] {
} inputData.toDF()
.writeStream
.format("parquet")
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.start(outputDir.getCanonicalPath)
}
assertMigrationError(e3.getMessage, checkpointDir, legacyCheckpointDir)
// Fix the checkpoint path and verify that the user can migrate the issue by moving files.
FileUtils.moveDirectory(legacyCheckpointDir, checkpointDir)
val q = inputData.toDF()
.writeStream
.format("parquet")
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.start(outputDir.getCanonicalPath)
try {
q.processAllAvailable()
// Check the query id to make sure it did use checkpoint
assert(q.id.toString == "09be7fb3-49d8-48a6-840d-e9c2ad92a898")
// Verify that the batch query can read "_spark_metadata" correctly after migration.
val df = spark.read.load(outputDir.getCanonicalPath)
assert(df.queryExecution.executedPlan.toString contains "MetadataLogFileIndex")
checkDatasetUnorderly(df.as[Int], 1, 2, 3)
} finally {
q.stop()
} }
} }
} }