[SPARK-14078] Streaming Parquet Based FileSink

This PR adds a new `Sink` implementation that writes out Parquet files.  In order to correctly handle partial failures while maintaining exactly once semantics, the files for each batch are written out to a unique directory and then atomically appended to a metadata log.  When a parquet based `DataSource` is initialized for reading, we first check for this log directory and use it instead of file listing when present.

Unit tests are added, as well as a stress test that checks the answer after non-deterministic injected failures.

Author: Michael Armbrust <michael@databricks.com>

Closes #11897 from marmbrus/fileSink.
This commit is contained in:
Michael Armbrust 2016-03-23 13:02:40 -07:00
parent 919bf32198
commit 6bc4be64f8
14 changed files with 430 additions and 15 deletions

View file

@ -91,6 +91,15 @@ trait ContinuousQuery {
*/
def awaitTermination(timeoutMs: Long): Boolean
/**
* Blocks until all available data in the source has been processed an committed to the sink.
* This method is intended for testing. Note that in the case of continually arriving data, this
* method may block forever. Additionally, this method is only guranteed to block until data that
* has been synchronously appended data to a [[org.apache.spark.sql.execution.streaming.Source]]
* prior to invocation. (i.e. `getOffset` must immediately reflect the addition).
*/
def processAllAvailable(): Unit
/**
* Stops the execution of this query if it is running. This method blocks until the threads
* performing execution has stopped.

View file

@ -32,12 +32,12 @@ import org.apache.spark.sql.execution.streaming.{Offset, StreamExecution}
*/
@Experimental
class ContinuousQueryException private[sql](
val query: ContinuousQuery,
@transient val query: ContinuousQuery,
val message: String,
val cause: Throwable,
val startOffset: Option[Offset] = None,
val endOffset: Option[Offset] = None
) extends Exception(message, cause) {
val endOffset: Option[Offset] = None)
extends Exception(message, cause) {
/** Time when the exception occurred */
val time: Long = System.currentTimeMillis

View file

@ -22,6 +22,7 @@ import java.util.ServiceLoader
import scala.collection.JavaConverters._
import scala.language.{existentials, implicitConversions}
import scala.util.{Failure, Success, Try}
import scala.util.control.NonFatal
import org.apache.hadoop.fs.Path
@ -29,7 +30,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.execution.streaming.{FileStreamSource, Sink, Source}
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{CalendarIntervalType, StructType}
import org.apache.spark.util.Utils
@ -176,14 +177,41 @@ case class DataSource(
/** Returns a sink that can be used to continually write data. */
def createSink(): Sink = {
val datasourceClass = providingClass.newInstance() match {
case s: StreamSinkProvider => s
providingClass.newInstance() match {
case s: StreamSinkProvider => s.createSink(sqlContext, options, partitionColumns)
case format: FileFormat =>
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
val path = caseInsensitiveOptions.getOrElse("path", {
throw new IllegalArgumentException("'path' is not specified")
})
new FileStreamSink(sqlContext, path, format)
case _ =>
throw new UnsupportedOperationException(
s"Data source $className does not support streamed writing")
}
}
datasourceClass.createSink(sqlContext, options, partitionColumns)
/**
* Returns true if there is a single path that has a metadata log indicating which files should
* be read.
*/
def hasMetadata(path: Seq[String]): Boolean = {
path match {
case Seq(singlePath) =>
try {
val hdfsPath = new Path(singlePath)
val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
val metadataPath = new Path(hdfsPath, FileStreamSink.metadataDir)
val res = fs.exists(metadataPath)
res
} catch {
case NonFatal(e) =>
logWarning(s"Error while looking for metadata directory.")
false
}
case _ => false
}
}
/** Create a resolved [[BaseRelation]] that can be used to read data from this [[DataSource]] */
@ -200,6 +228,34 @@ case class DataSource(
case (_: RelationProvider, Some(_)) =>
throw new AnalysisException(s"$className does not allow user-specified schemas.")
// We are reading from the results of a streaming query. Load files from the metadata log
// instead of listing them using HDFS APIs.
case (format: FileFormat, _)
if hasMetadata(caseInsensitiveOptions.get("path").toSeq ++ paths) =>
val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head)
val fileCatalog =
new StreamFileCatalog(sqlContext, basePath)
val dataSchema = userSpecifiedSchema.orElse {
format.inferSchema(
sqlContext,
caseInsensitiveOptions,
fileCatalog.allFiles())
}.getOrElse {
throw new AnalysisException(
s"Unable to infer schema for $format at ${fileCatalog.allFiles().mkString(",")}. " +
"It must be specified manually")
}
HadoopFsRelation(
sqlContext,
fileCatalog,
partitionSchema = fileCatalog.partitionSpec().partitionColumns,
dataSchema = dataSchema,
bucketSpec = None,
format,
options)
// This is a non-streaming file based datasource.
case (format: FileFormat, _) =>
val allPaths = caseInsensitiveOptions.get("path") ++ paths
val globbedPaths = allPaths.flatMap { path =>

View file

@ -64,6 +64,9 @@ case class CompositeOffset(offsets: Seq[Option[Offset]]) extends Offset {
assert(sources.size == offsets.size)
new StreamProgress ++ sources.zip(offsets).collect { case (s, Some(o)) => (s, o) }
}
override def toString: String =
offsets.map(_.map(_.toString).getOrElse("-")).mkString("[", ", ", "]")
}
object CompositeOffset {

View file

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.streaming
import java.util.UUID
import org.apache.hadoop.fs.Path
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.sources.FileFormat
object FileStreamSink {
// The name of the subdirectory that is used to store metadata about which files are valid.
val metadataDir = "_spark_metadata"
}
/**
* A sink that writes out results to parquet files. Each batch is written out to a unique
* directory. After all of the files in a batch have been succesfully written, the list of
* file paths is appended to the log atomically. In the case of partial failures, some duplicate
* data may be present in the target directory, but only one copy of each file will be present
* in the log.
*/
class FileStreamSink(
sqlContext: SQLContext,
path: String,
fileFormat: FileFormat) extends Sink with Logging {
private val basePath = new Path(path)
private val logPath = new Path(basePath, FileStreamSink.metadataDir)
private val fileLog = new HDFSMetadataLog[Seq[String]](sqlContext, logPath.toUri.toString)
override def addBatch(batchId: Long, data: DataFrame): Unit = {
if (fileLog.get(batchId).isDefined) {
logInfo(s"Skipping already committed batch $batchId")
} else {
val files = writeFiles(data)
if (fileLog.add(batchId, files)) {
logInfo(s"Committed batch $batchId")
} else {
logWarning(s"Race while writing batch $batchId")
}
}
}
/** Writes the [[DataFrame]] to a UUID-named dir, returning the list of files paths. */
private def writeFiles(data: DataFrame): Seq[String] = {
val ctx = sqlContext
val outputDir = path
val format = fileFormat
val schema = data.schema
val file = new Path(basePath, UUID.randomUUID().toString).toUri.toString
data.write.parquet(file)
sqlContext.read
.schema(data.schema)
.parquet(file)
.inputFiles
.map(new Path(_))
.filterNot(_.getName.startsWith("_"))
.map(_.toUri.toString)
}
override def toString: String = s"FileSink[$path]"
}

View file

@ -44,7 +44,7 @@ class FileStreamSource(
private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L)
private val seenFiles = new OpenHashSet[String]
metadataLog.get(None, maxBatchId).foreach { case (batchId, files) =>
metadataLog.get(None, Some(maxBatchId)).foreach { case (batchId, files) =>
files.foreach(seenFiles.add)
}
@ -114,18 +114,24 @@ class FileStreamSource(
val endId = end.asInstanceOf[LongOffset].offset
assert(startId <= endId)
val files = metadataLog.get(Some(startId + 1), endId).map(_._2).flatten
logDebug(s"Return files from batches ${startId + 1}:$endId")
val files = metadataLog.get(Some(startId + 1), Some(endId)).map(_._2).flatten
logInfo(s"Processing ${files.length} files from ${startId + 1}:$endId")
logDebug(s"Streaming ${files.mkString(", ")}")
dataFrameBuilder(files)
}
private def fetchAllFiles(): Seq[String] = {
fs.listStatus(new Path(path))
val startTime = System.nanoTime()
val files = fs.listStatus(new Path(path))
.filterNot(_.getPath.getName.startsWith("_"))
.map(_.getPath.toUri.toString)
val endTime = System.nanoTime()
logDebug(s"Listed ${files.size} in ${(endTime.toDouble - startTime) / 1000000}ms")
files
}
override def getOffset: Option[Offset] = Some(fetchMaxOffset()).filterNot(_.offset == -1)
override def toString: String = s"FileSource[$path]"
}

View file

@ -170,11 +170,12 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String)
}
}
override def get(startId: Option[Long], endId: Long): Array[(Long, T)] = {
val batchIds = fc.util().listStatus(metadataPath, batchFilesFilter)
override def get(startId: Option[Long], endId: Option[Long]): Array[(Long, T)] = {
val files = fc.util().listStatus(metadataPath, batchFilesFilter)
val batchIds = files
.map(_.getPath.getName.toLong)
.filter { batchId =>
batchId <= endId && (startId.isEmpty || batchId >= startId.get)
(endId.isEmpty || batchId <= endId.get) && (startId.isEmpty || batchId >= startId.get)
}
batchIds.sorted.map(batchId => (batchId, get(batchId))).filter(_._2.isDefined).map {
case (batchId, metadataOption) =>

View file

@ -30,4 +30,6 @@ case class LongOffset(offset: Long) extends Offset {
def +(increment: Long): LongOffset = new LongOffset(offset + increment)
def -(decrement: Long): LongOffset = new LongOffset(offset - decrement)
override def toString: String = s"#$offset"
}

View file

@ -42,7 +42,7 @@ trait MetadataLog[T] {
* Return metadata for batches between startId (inclusive) and endId (inclusive). If `startId` is
* `None`, just return all batches before endId (inclusive).
*/
def get(startId: Option[Long], endId: Long): Array[(Long, T)]
def get(startId: Option[Long], endId: Option[Long]): Array[(Long, T)]
/**
* Return the latest batch Id and its metadata if exist.

View file

@ -239,6 +239,12 @@ class StreamExecution(
logInfo(s"Committed offsets for batch $currentBatchId.")
true
} else {
noNewData = true
awaitBatchLock.synchronized {
// Wake up any threads that are waiting for the stream to progress.
awaitBatchLock.notifyAll()
}
false
}
}
@ -334,6 +340,18 @@ class StreamExecution(
logDebug(s"Unblocked at $newOffset for $source")
}
/** A flag to indicate that a batch has completed with no new data available. */
@volatile private var noNewData = false
override def processAllAvailable(): Unit = {
noNewData = false
while (!noNewData) {
awaitBatchLock.synchronized { awaitBatchLock.wait(10000) }
if (streamDeathCause != null) { throw streamDeathCause }
}
if (streamDeathCause != null) { throw streamDeathCause }
}
override def awaitTermination(): Unit = {
if (state == INITIALIZED) {
throw new IllegalStateException("Cannot wait for termination on a query that has not started")

View file

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.streaming
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources.PartitionSpec
import org.apache.spark.sql.sources.{FileCatalog, Partition}
import org.apache.spark.sql.types.StructType
class StreamFileCatalog(sqlContext: SQLContext, path: Path) extends FileCatalog with Logging {
val metadataDirectory = new Path(path, FileStreamSink.metadataDir)
logInfo(s"Reading streaming file log from $metadataDirectory")
val metadataLog = new HDFSMetadataLog[Seq[String]](sqlContext, metadataDirectory.toUri.toString)
val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
override def paths: Seq[Path] = path :: Nil
override def partitionSpec(): PartitionSpec = PartitionSpec(StructType(Nil), Nil)
/**
* Returns all valid files grouped into partitions when the data is partitioned. If the data is
* unpartitioned, this will return a single partition with not partition values.
*
* @param filters the filters used to prune which partitions are returned. These filters must
* only refer to partition columns and this method will only return files
* where these predicates are guaranteed to evaluate to `true`. Thus, these
* filters will not need to be evaluated again on the returned data.
*/
override def listFiles(filters: Seq[Expression]): Seq[Partition] =
Partition(InternalRow.empty, allFiles()) :: Nil
override def getStatus(path: Path): Array[FileStatus] = fs.listStatus(path)
override def refresh(): Unit = {}
override def allFiles(): Seq[FileStatus] = {
fs.listStatus(metadataLog.get(None, None).flatMap(_._2).map(new Path(_)))
}
}

View file

@ -27,6 +27,8 @@ import org.apache.spark.sql.test.SharedSQLContext
class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
private implicit def toOption[A](a: A): Option[A] = Option(a)
test("basic") {
withTempDir { temp =>
val metadataLog = new HDFSMetadataLog[String](sqlContext, temp.getAbsolutePath)

View file

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.streaming
import org.apache.spark.sql.StreamTest
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.util.Utils
class FileStreamSinkSuite extends StreamTest with SharedSQLContext {
import testImplicits._
test("unpartitioned writing") {
val inputData = MemoryStream[Int]
val df = inputData.toDF()
val outputDir = Utils.createTempDir("stream.output").getCanonicalPath
val checkpointDir = Utils.createTempDir("stream.checkpoint").getCanonicalPath
val query =
df.write
.format("parquet")
.option("checkpointLocation", checkpointDir)
.startStream(outputDir)
inputData.addData(1, 2, 3)
failAfter(streamingTimeout) { query.processAllAvailable() }
val outputDf = sqlContext.read.parquet(outputDir).as[Int]
checkDataset(
outputDf,
1, 2, 3)
}
}

View file

@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.streaming
import java.io.File
import java.util.UUID
import scala.util.Random
import scala.util.control.NonFatal
import org.apache.spark.sql.{ContinuousQuery, ContinuousQueryException, StreamTest}
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.util.Utils
/**
* A stress test for streamign queries that read and write files. This test constists of
* two threads:
* - one that writes out `numRecords` distinct integers to files of random sizes (the total
* number of records is fixed but each files size / creation time is random).
* - another that continually restarts a buggy streaming query (i.e. fails with 5% probability on
* any partition).
*
* At the end, the resulting files are loaded and the answer is checked.
*/
class FileStressSuite extends StreamTest with SharedSQLContext {
import testImplicits._
test("fault tolerance stress test") {
val numRecords = 10000
val inputDir = Utils.createTempDir("stream.input").getCanonicalPath
val stagingDir = Utils.createTempDir("stream.staging").getCanonicalPath
val outputDir = Utils.createTempDir("stream.output").getCanonicalPath
val checkpoint = Utils.createTempDir("stream.checkpoint").getCanonicalPath
@volatile
var continue = true
@volatile
var stream: ContinuousQuery = null
val writer = new Thread("stream writer") {
override def run(): Unit = {
var i = numRecords
while (i > 0) {
val count = Random.nextInt(100)
var j = 0
var string = ""
while (j < count && i > 0) {
if (i % 10000 == 0) { logError(s"Wrote record $i") }
string = string + i + "\n"
j += 1
i -= 1
}
val uuid = UUID.randomUUID().toString
val fileName = new File(stagingDir, uuid)
stringToFile(fileName, string)
fileName.renameTo(new File(inputDir, uuid))
val sleep = Random.nextInt(100)
Thread.sleep(sleep)
}
logError("== DONE WRITING ==")
var done = false
while (!done) {
try {
stream.processAllAvailable()
done = true
} catch {
case NonFatal(_) =>
}
}
continue = false
stream.stop()
}
}
writer.start()
val input = sqlContext.read.format("text").stream(inputDir)
def startStream(): ContinuousQuery = input
.repartition(5)
.as[String]
.mapPartitions { iter =>
val rand = Random.nextInt(100)
if (rand < 5) { sys.error("failure") }
iter.map(_.toLong)
}
.write
.format("parquet")
.option("checkpointLocation", checkpoint)
.startStream(outputDir)
var failures = 0
val streamThread = new Thread("stream runner") {
while (continue) {
if (failures % 10 == 0) { logError(s"Query restart #$failures") }
stream = startStream()
try {
stream.awaitTermination()
} catch {
case ce: ContinuousQueryException =>
failures += 1
}
}
}
streamThread.join()
logError(s"Stream restarted $failures times.")
assert(sqlContext.read.parquet(outputDir).distinct().count() == numRecords)
}
}