[SPARK-14716][SQL] Added support for partitioning in FileStreamSink

# What changes were proposed in this pull request?

Support partitioning in the file stream sink. This is implemented using a new, but simpler code path for writing parquet files - both unpartitioned and partitioned. This new code path does not use Output Committers, as we will eventually write the file names to the metadata log for "committing" them.

This patch duplicates < 100 LOC from the WriterContainer. But its far simpler that WriterContainer as it does not involve output committing. In addition, it introduces the new APIs in FileFormat and OutputWriterFactory in an attempt to simplify the APIs (not have Job in the `FileFormat` API, not have bucket and other stuff in the `OutputWriterFactory.newInstance()` ).

# Tests
- New unit tests to test the FileStreamSinkWriter for partitioned and unpartitioned files
- New unit test to partially test the FileStreamSink for partitioned files (does not test recovery of partition column data, as that requires change in the StreamFileCatalog, future PR).
- Updated FileStressSuite to test number of records read from partitioned output files.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #12409 from tdas/streaming-partitioned-parquet.
This commit is contained in:
Tathagata Das 2016-05-03 10:58:26 -07:00
parent 5bd9a2f697
commit 4ad492c403
9 changed files with 606 additions and 55 deletions

View file

@ -940,7 +940,7 @@ class SQLTests(ReusedPySparkTestCase):
cq.processAllAvailable()
output_files = []
for _, _, files in os.walk(out):
output_files.extend([f for f in files if 'parquet' in f and not f.startswith('.')])
output_files.extend([f for f in files if not f.startswith('.')])
self.assertTrue(len(output_files) > 0)
self.assertTrue(len(os.listdir(chk)) > 0)
finally:
@ -967,7 +967,7 @@ class SQLTests(ReusedPySparkTestCase):
cq.processAllAvailable()
output_files = []
for _, _, files in os.walk(out):
output_files.extend([f for f in files if 'parquet' in f and not f.startswith('.')])
output_files.extend([f for f in files if not f.startswith('.')])
self.assertTrue(len(output_files) > 0)
self.assertTrue(len(os.listdir(chk)) > 0)
self.assertFalse(os.path.isdir(fake1)) # should not have been created

View file

@ -203,13 +203,14 @@ case class DataSource(
def createSink(): Sink = {
providingClass.newInstance() match {
case s: StreamSinkProvider => s.createSink(sparkSession.wrapped, options, partitionColumns)
case format: FileFormat =>
case parquet: parquet.DefaultSource =>
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
val path = caseInsensitiveOptions.getOrElse("path", {
throw new IllegalArgumentException("'path' is not specified")
})
new FileStreamSink(sparkSession, path, parquet, partitionColumns, options)
new FileStreamSink(sparkSession, path, format)
case _ =>
throw new UnsupportedOperationException(
s"Data source $className does not support streamed writing")

View file

@ -64,6 +64,20 @@ abstract class OutputWriterFactory extends Serializable {
bucketId: Option[Int], // TODO: This doesn't belong here...
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter
/**
* Returns a new instance of [[OutputWriter]] that will write data to the given path.
* This method gets called by each task on executor to write [[InternalRow]]s to
* format-specific files. Compared to the other `newInstance()`, this is a newer API that
* passes only the path that the writer must write to. The writer must write to the exact path
* and not modify it (do not add subdirectories, extensions, etc.). All other
* file-format-specific information needed to create the writer must be passed
* through the [[OutputWriterFactory]] implementation.
* @since 2.0.0
*/
private[sql] def newWriter(path: String): OutputWriter = {
throw new UnsupportedOperationException("newInstance with just path not supported")
}
}
/**
@ -223,6 +237,20 @@ trait FileFormat {
// Until then we guard in [[FileSourceStrategy]] to only call this method on supported formats.
throw new UnsupportedOperationException(s"buildReader is not supported for $this")
}
/**
* Returns a [[OutputWriterFactory]] for generating output writers that can write data.
* This method is current used only by FileStreamSinkWriter to generate output writers that
* does not use output committers to write data. The OutputWriter generated by the returned
* [[OutputWriterFactory]] must implement the method `newWriter(path)`..
*/
def buildWriter(
sqlContext: SQLContext,
dataSchema: StructType,
options: Map[String, String]): OutputWriterFactory = {
// TODO: Remove this default implementation when the other formats have been ported
throw new UnsupportedOperationException(s"buildWriter is not supported for $this")
}
}
/**

View file

@ -41,13 +41,13 @@ import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.JoinedRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{AtomicType, DataType, StructType}
import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableConfiguration
private[sql] class DefaultSource
@ -372,8 +372,120 @@ private[sql] class DefaultSource
}
}
}
override def buildWriter(
sqlContext: SQLContext,
dataSchema: StructType,
options: Map[String, String]): OutputWriterFactory = {
new ParquetOutputWriterFactory(
sqlContext.conf,
dataSchema,
sqlContext.sparkContext.hadoopConfiguration,
options)
}
}
/**
* A factory for generating OutputWriters for writing parquet files. This implemented is different
* from the [[ParquetOutputWriter]] as this does not use any [[OutputCommitter]]. It simply
* writes the data to the path used to generate the output writer. Callers of this factory
* has to ensure which files are to be considered as committed.
*/
private[sql] class ParquetOutputWriterFactory(
sqlConf: SQLConf,
dataSchema: StructType,
hadoopConf: Configuration,
options: Map[String, String]) extends OutputWriterFactory {
private val serializableConf: SerializableConfiguration = {
val job = Job.getInstance(hadoopConf)
val conf = ContextUtil.getConfiguration(job)
val parquetOptions = new ParquetOptions(options, sqlConf)
// We're not really using `ParquetOutputFormat[Row]` for writing data here, because we override
// it in `ParquetOutputWriter` to support appending and dynamic partitioning. The reason why
// we set it here is to setup the output committer class to `ParquetOutputCommitter`, which is
// bundled with `ParquetOutputFormat[Row]`.
job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]])
ParquetOutputFormat.setWriteSupportClass(job, classOf[CatalystWriteSupport])
// We want to clear this temporary metadata from saving into Parquet file.
// This metadata is only useful for detecting optional columns when pushdowning filters.
val dataSchemaToWrite = StructType.removeMetadata(
StructType.metadataKeyForOptionalField,
dataSchema).asInstanceOf[StructType]
CatalystWriteSupport.setSchema(dataSchemaToWrite, conf)
// Sets flags for `CatalystSchemaConverter` (which converts Catalyst schema to Parquet schema)
// and `CatalystWriteSupport` (writing actual rows to Parquet files).
conf.set(
SQLConf.PARQUET_BINARY_AS_STRING.key,
sqlConf.isParquetBinaryAsString.toString)
conf.set(
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
sqlConf.isParquetINT96AsTimestamp.toString)
conf.set(
SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key,
sqlConf.writeLegacyParquetFormat.toString)
// Sets compression scheme
conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodec)
new SerializableConfiguration(conf)
}
/**
* Returns a [[OutputWriter]] that writes data to the give path without using
* [[OutputCommitter]].
*/
override private[sql] def newWriter(path: String): OutputWriter = new OutputWriter {
// Create TaskAttemptContext that is used to pass on Configuration to the ParquetRecordWriter
private val hadoopTaskAttempId = new TaskAttemptID(new TaskID(new JobID, TaskType.MAP, 0), 0)
private val hadoopAttemptContext = new TaskAttemptContextImpl(
serializableConf.value, hadoopTaskAttempId)
// Instance of ParquetRecordWriter that does not use OutputCommitter
private val recordWriter = createNoCommitterRecordWriter(path, hadoopAttemptContext)
override def write(row: Row): Unit = {
throw new UnsupportedOperationException("call writeInternal")
}
protected[sql] override def writeInternal(row: InternalRow): Unit = {
recordWriter.write(null, row)
}
override def close(): Unit = recordWriter.close(hadoopAttemptContext)
}
/** Create a [[ParquetRecordWriter]] that writes the given path without using OutputCommitter */
private def createNoCommitterRecordWriter(
path: String,
hadoopAttemptContext: TaskAttemptContext): RecordWriter[Void, InternalRow] = {
// Custom ParquetOutputFormat that disable use of committer and writes to the given path
val outputFormat = new ParquetOutputFormat[InternalRow]() {
override def getOutputCommitter(c: TaskAttemptContext): OutputCommitter = { null }
override def getDefaultWorkFile(c: TaskAttemptContext, ext: String): Path = { new Path(path) }
}
outputFormat.getRecordWriter(hadoopAttemptContext)
}
/** Disable the use of the older API. */
def newInstance(
path: String,
bucketId: Option[Int],
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
throw new UnsupportedOperationException(
"this verison of newInstance not supported for " +
"ParquetOutputWriterFactory")
}
}
// NOTE: This class is instantiated and used on executor side only, no need to be serializable.
private[sql] class ParquetOutputWriter(
path: String,

View file

@ -19,11 +19,20 @@ package org.apache.spark.sql.execution.streaming
import java.util.UUID
import scala.collection.mutable.ArrayBuffer
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.{SparkEnv, SparkException, TaskContext, TaskContextImpl}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.UnsafeKVExternalSorter
import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriter, PartitioningUtils}
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.util.SerializableConfiguration
object FileStreamSink {
// The name of the subdirectory that is used to store metadata about which files are valid.
@ -40,28 +49,24 @@ object FileStreamSink {
class FileStreamSink(
sparkSession: SparkSession,
path: String,
fileFormat: FileFormat) extends Sink with Logging {
fileFormat: FileFormat,
partitionColumnNames: Seq[String],
options: Map[String, String]) extends Sink with Logging {
private val basePath = new Path(path)
private val logPath = new Path(basePath, FileStreamSink.metadataDir)
private val fileLog = new FileStreamSinkLog(sparkSession, logPath.toUri.toString)
private val fs = basePath.getFileSystem(sparkSession.sessionState.newHadoopConf())
private val hadoopConf = sparkSession.sessionState.newHadoopConf()
private val fs = basePath.getFileSystem(hadoopConf)
override def addBatch(batchId: Long, data: DataFrame): Unit = {
if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) {
logInfo(s"Skipping already committed batch $batchId")
} else {
val files = fs.listStatus(writeFiles(data)).map { f =>
SinkFileStatus(
path = f.getPath.toUri.toString,
size = f.getLen,
isDir = f.isDirectory,
modificationTime = f.getModificationTime,
blockReplication = f.getReplication,
blockSize = f.getBlockSize,
action = FileStreamSinkLog.ADD_ACTION)
}
if (fileLog.add(batchId, files)) {
val writer = new FileStreamSinkWriter(
data, fileFormat, path, partitionColumnNames, hadoopConf, options)
val fileStatuses = writer.write()
if (fileLog.add(batchId, fileStatuses)) {
logInfo(s"Committed batch $batchId")
} else {
throw new IllegalStateException(s"Race while writing batch $batchId")
@ -69,17 +74,192 @@ class FileStreamSink(
}
}
/** Writes the [[DataFrame]] to a UUID-named dir, returning the list of files paths. */
private def writeFiles(data: DataFrame): Array[Path] = {
val file = new Path(basePath, UUID.randomUUID().toString).toUri.toString
data.write.parquet(file)
sparkSession.read
.schema(data.schema)
.parquet(file)
.inputFiles
.map(new Path(_))
.filterNot(_.getName.startsWith("_"))
}
override def toString: String = s"FileSink[$path]"
}
/**
* Writes data given to a [[FileStreamSink]] to the given `basePath` in the given `fileFormat`,
* partitioned by the given `partitionColumnNames`. This writer always appends data to the
* directory if it already has data.
*/
class FileStreamSinkWriter(
data: DataFrame,
fileFormat: FileFormat,
basePath: String,
partitionColumnNames: Seq[String],
hadoopConf: Configuration,
options: Map[String, String]) extends Serializable with Logging {
PartitioningUtils.validatePartitionColumnDataTypes(
data.schema, partitionColumnNames, data.sqlContext.conf.caseSensitiveAnalysis)
private val serializableConf = new SerializableConfiguration(hadoopConf)
private val dataSchema = data.schema
private val dataColumns = data.logicalPlan.output
// Get the actual partition columns as attributes after matching them by name with
// the given columns names.
private val partitionColumns = partitionColumnNames.map { col =>
val nameEquality = if (data.sparkSession.sessionState.conf.caseSensitiveAnalysis) {
org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
} else {
org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution
}
data.logicalPlan.output.find(f => nameEquality(f.name, col)).getOrElse {
throw new RuntimeException(s"Partition column $col not found in schema $dataSchema")
}
}
// Columns that are to be written to the files. If there are partitioning columns, then
// those will not be written to the files.
private val writeColumns = {
val partitionSet = AttributeSet(partitionColumns)
dataColumns.filterNot(partitionSet.contains)
}
// An OutputWriterFactory for generating writers in the executors for writing the files.
private val outputWriterFactory =
fileFormat.buildWriter(data.sqlContext, writeColumns.toStructType, options)
/** Expressions that given a partition key build a string like: col1=val/col2=val/... */
private def partitionStringExpression: Seq[Expression] = {
partitionColumns.zipWithIndex.flatMap { case (c, i) =>
val escaped =
ScalaUDF(
PartitioningUtils.escapePathName _,
StringType,
Seq(Cast(c, StringType)),
Seq(StringType))
val str = If(IsNull(c), Literal(PartitioningUtils.DEFAULT_PARTITION_NAME), escaped)
val partitionName = Literal(c.name + "=") :: str :: Nil
if (i == 0) partitionName else Literal(Path.SEPARATOR) :: partitionName
}
}
/** Generate a new output writer from the writer factory */
private def newOutputWriter(path: Path): OutputWriter = {
val newWriter = outputWriterFactory.newWriter(path.toString)
newWriter.initConverter(dataSchema)
newWriter
}
/** Write the dataframe to files. This gets called in the driver by the [[FileStreamSink]]. */
def write(): Array[SinkFileStatus] = {
data.sqlContext.sparkContext.runJob(
data.queryExecution.toRdd,
(taskContext: TaskContext, iterator: Iterator[InternalRow]) => {
if (partitionColumns.isEmpty) {
Seq(writePartitionToSingleFile(iterator))
} else {
writePartitionToPartitionedFiles(iterator)
}
}).flatten
}
/**
* Writes a RDD partition to a single file without dynamic partitioning.
* This gets called in the executor, and it uses a [[OutputWriter]] to write the data.
*/
def writePartitionToSingleFile(iterator: Iterator[InternalRow]): SinkFileStatus = {
var writer: OutputWriter = null
try {
val path = new Path(basePath, UUID.randomUUID.toString)
val fs = path.getFileSystem(serializableConf.value)
writer = newOutputWriter(path)
while (iterator.hasNext) {
writer.writeInternal(iterator.next)
}
writer.close()
writer = null
SinkFileStatus(fs.getFileStatus(path))
} catch {
case cause: Throwable =>
logError("Aborting task.", cause)
// call failure callbacks first, so we could have a chance to cleanup the writer.
TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(cause)
throw new SparkException("Task failed while writing rows.", cause)
} finally {
if (writer != null) {
writer.close()
}
}
}
/**
* Writes a RDD partition to multiple dynamically partitioned files.
* This gets called in the executor. It first sorts the data based on the partitioning columns
* and then writes the data of each key to separate files using [[OutputWriter]]s.
*/
def writePartitionToPartitionedFiles(iterator: Iterator[InternalRow]): Seq[SinkFileStatus] = {
// Returns the partitioning columns for sorting
val getSortingKey = UnsafeProjection.create(partitionColumns, dataColumns)
// Returns the data columns to be written given an input row
val getOutputRow = UnsafeProjection.create(writeColumns, dataColumns)
// Returns the partition path given a partition key
val getPartitionString =
UnsafeProjection.create(Concat(partitionStringExpression) :: Nil, partitionColumns)
// Sort the data before write, so that we only need one writer at the same time.
val sorter = new UnsafeKVExternalSorter(
partitionColumns.toStructType,
StructType.fromAttributes(writeColumns),
SparkEnv.get.blockManager,
SparkEnv.get.serializerManager,
TaskContext.get().taskMemoryManager().pageSizeBytes)
while (iterator.hasNext) {
val currentRow = iterator.next()
sorter.insertKV(getSortingKey(currentRow), getOutputRow(currentRow))
}
logDebug(s"Sorting complete. Writing out partition files one at a time.")
val sortedIterator = sorter.sortedIterator()
val paths = new ArrayBuffer[Path]
// Write the sorted data to partitioned files, one for each unique key
var currentWriter: OutputWriter = null
try {
var currentKey: UnsafeRow = null
while (sortedIterator.next()) {
val nextKey = sortedIterator.getKey
// If key changes, close current writer, and open a new writer to a new partitioned file
if (currentKey != nextKey) {
if (currentWriter != null) {
currentWriter.close()
currentWriter = null
}
currentKey = nextKey.copy()
val partitionPath = getPartitionString(currentKey).getString(0)
val path = new Path(new Path(basePath, partitionPath), UUID.randomUUID.toString)
paths += path
currentWriter = newOutputWriter(path)
logInfo(s"Writing partition $currentKey to $path")
}
currentWriter.writeInternal(sortedIterator.getValue)
}
if (currentWriter != null) {
currentWriter.close()
currentWriter = null
}
if (paths.nonEmpty) {
val fs = paths.head.getFileSystem(serializableConf.value)
paths.map(p => SinkFileStatus(fs.getFileStatus(p)))
} else Seq.empty
} catch {
case cause: Throwable =>
logError("Aborting task.", cause)
// call failure callbacks first, so we could have a chance to cleanup the writer.
TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(cause)
throw new SparkException("Task failed while writing rows.", cause)
} finally {
if (currentWriter != null) {
currentWriter.close()
}
}
}
}

View file

@ -54,6 +54,19 @@ case class SinkFileStatus(
}
}
object SinkFileStatus {
def apply(f: FileStatus): SinkFileStatus = {
SinkFileStatus(
path = f.getPath.toUri.toString,
size = f.getLen,
isDir = f.isDirectory,
modificationTime = f.getModificationTime,
blockReplication = f.getReplication,
blockSize = f.getBlockSize,
action = FileStreamSinkLog.ADD_ACTION)
}
}
/**
* A special log for [[FileStreamSink]]. It will write one log file for each batch. The first line
* of the log file is the version number, and there are multiple JSON lines following. Each JSON

View file

@ -216,8 +216,9 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String)
new FileContextManager(metadataPath, hadoopConf)
} catch {
case e: UnsupportedFileSystemException =>
logWarning("Could not use FileContext API for managing metadata log file. The log may be" +
"inconsistent under failures.", e)
logWarning("Could not use FileContext API for managing metadata log files at path " +
s"$metadataPath. Using FileSystem API instead for managing log files. The log may be " +
s"inconsistent under failures.")
new FileSystemManager(metadataPath, hadoopConf)
}
}

View file

@ -17,33 +17,223 @@
package org.apache.spark.sql.streaming
import org.apache.spark.sql.StreamTest
import org.apache.spark.sql.execution.streaming.MemoryStream
import java.io.File
import org.apache.commons.io.FileUtils
import org.apache.commons.io.filefilter.{DirectoryFileFilter, RegexFileFilter}
import org.apache.spark.sql.{ContinuousQuery, Row, StreamTest}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.execution.datasources.parquet
import org.apache.spark.sql.execution.streaming.{FileStreamSinkWriter, MemoryStream}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.util.Utils
class FileStreamSinkSuite extends StreamTest with SharedSQLContext {
import testImplicits._
test("unpartitioned writing") {
test("FileStreamSinkWriter - unpartitioned data") {
val path = Utils.createTempDir()
path.delete()
val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
val fileFormat = new parquet.DefaultSource()
def writeRange(start: Int, end: Int, numPartitions: Int): Seq[String] = {
val df = sqlContext
.range(start, end, 1, numPartitions)
.select($"id", lit(100).as("data"))
val writer = new FileStreamSinkWriter(
df, fileFormat, path.toString, partitionColumnNames = Nil, hadoopConf, Map.empty)
writer.write().map(_.path.stripPrefix("file://"))
}
// Write and check whether new files are written correctly
val files1 = writeRange(0, 10, 2)
assert(files1.size === 2, s"unexpected number of files: $files1")
checkFilesExist(path, files1, "file not written")
checkAnswer(sqlContext.read.load(path.getCanonicalPath), (0 until 10).map(Row(_, 100)))
// Append and check whether new files are written correctly and old files still exist
val files2 = writeRange(10, 20, 3)
assert(files2.size === 3, s"unexpected number of files: $files2")
assert(files2.intersect(files1).isEmpty, "old files returned")
checkFilesExist(path, files2, s"New file not written")
checkFilesExist(path, files1, s"Old file not found")
checkAnswer(sqlContext.read.load(path.getCanonicalPath), (0 until 20).map(Row(_, 100)))
}
test("FileStreamSinkWriter - partitioned data") {
implicit val e = ExpressionEncoder[java.lang.Long]
val path = Utils.createTempDir()
path.delete()
val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
val fileFormat = new parquet.DefaultSource()
def writeRange(start: Int, end: Int, numPartitions: Int): Seq[String] = {
val df = sqlContext
.range(start, end, 1, numPartitions)
.flatMap(x => Iterator(x, x, x)).toDF("id")
.select($"id", lit(100).as("data1"), lit(1000).as("data2"))
require(df.rdd.partitions.size === numPartitions)
val writer = new FileStreamSinkWriter(
df, fileFormat, path.toString, partitionColumnNames = Seq("id"), hadoopConf, Map.empty)
writer.write().map(_.path.stripPrefix("file://"))
}
def checkOneFileWrittenPerKey(keys: Seq[Int], filesWritten: Seq[String]): Unit = {
keys.foreach { id =>
assert(
filesWritten.count(_.contains(s"/id=$id/")) == 1,
s"no file for id=$id. all files: \n\t${filesWritten.mkString("\n\t")}"
)
}
}
// Write and check whether new files are written correctly
val files1 = writeRange(0, 10, 2)
assert(files1.size === 10, s"unexpected number of files:\n${files1.mkString("\n")}")
checkFilesExist(path, files1, "file not written")
checkOneFileWrittenPerKey(0 until 10, files1)
val answer1 = (0 until 10).flatMap(x => Iterator(x, x, x)).map(Row(100, 1000, _))
checkAnswer(sqlContext.read.load(path.getCanonicalPath), answer1)
// Append and check whether new files are written correctly and old files still exist
val files2 = writeRange(0, 20, 3)
assert(files2.size === 20, s"unexpected number of files:\n${files2.mkString("\n")}")
assert(files2.intersect(files1).isEmpty, "old files returned")
checkFilesExist(path, files2, s"New file not written")
checkFilesExist(path, files1, s"Old file not found")
checkOneFileWrittenPerKey(0 until 20, files2)
val answer2 = (0 until 20).flatMap(x => Iterator(x, x, x)).map(Row(100, 1000, _))
checkAnswer(sqlContext.read.load(path.getCanonicalPath), answer1 ++ answer2)
}
test("FileStreamSink - unpartitioned writing and batch reading") {
val inputData = MemoryStream[Int]
val df = inputData.toDF()
val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath
val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath
val query =
var query: ContinuousQuery = null
try {
query =
df.write
.format("parquet")
.option("checkpointLocation", checkpointDir)
.startStream(outputDir)
inputData.addData(1, 2, 3)
failAfter(streamingTimeout) { query.processAllAvailable() }
failAfter(streamingTimeout) {
query.processAllAvailable()
}
val outputDf = sqlContext.read.parquet(outputDir).as[Int]
checkDataset(
outputDf,
1, 2, 3)
checkDataset(outputDf, 1, 2, 3)
} finally {
if (query != null) {
query.stop()
}
}
}
test("FileStreamSink - partitioned writing and batch reading [IGNORES PARTITION COLUMN]") {
val inputData = MemoryStream[Int]
val ds = inputData.toDS()
val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath
val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath
var query: ContinuousQuery = null
try {
query =
ds.map(i => (i, i * 1000))
.toDF("id", "value")
.write
.format("parquet")
.partitionBy("id")
.option("checkpointLocation", checkpointDir)
.startStream(outputDir)
inputData.addData(1, 2, 3)
failAfter(streamingTimeout) {
query.processAllAvailable()
}
// TODO (tdas): Test partition column can be read or not
val outputDf = sqlContext.read.parquet(outputDir)
checkDataset(
outputDf.as[Int],
1000, 2000, 3000)
} finally {
if (query != null) {
query.stop()
}
}
}
test("FileStreamSink - supported formats") {
def testFormat(format: Option[String]): Unit = {
val inputData = MemoryStream[Int]
val ds = inputData.toDS()
val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath
val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath
var query: ContinuousQuery = null
try {
val writer =
ds.map(i => (i, i * 1000))
.toDF("id", "value")
.write
if (format.nonEmpty) {
writer.format(format.get)
}
query = writer
.option("checkpointLocation", checkpointDir)
.startStream(outputDir)
} finally {
if (query != null) {
query.stop()
}
}
}
testFormat(None) // should not throw error as default format parquet when not specified
testFormat(Some("parquet"))
val e = intercept[UnsupportedOperationException] {
testFormat(Some("text"))
}
Seq("text", "not support", "stream").foreach { s =>
assert(e.getMessage.contains(s))
}
}
private def checkFilesExist(dir: File, expectedFiles: Seq[String], msg: String): Unit = {
import scala.collection.JavaConverters._
val files =
FileUtils.listFiles(dir, new RegexFileFilter("[^.]+"), DirectoryFileFilter.DIRECTORY)
.asScala
.map(_.getCanonicalPath)
.toSet
expectedFiles.foreach { f =>
assert(files.contains(f),
s"\n$msg\nexpected file:\n\t$f\nfound files:\n${files.mkString("\n\t")}")
}
}
}

View file

@ -41,7 +41,15 @@ import org.apache.spark.util.Utils
class FileStressSuite extends StreamTest with SharedSQLContext {
import testImplicits._
test("fault tolerance stress test") {
testQuietly("fault tolerance stress test - unpartitioned output") {
stressTest(partitionWrites = false)
}
testQuietly("fault tolerance stress test - partitioned output") {
stressTest(partitionWrites = true)
}
def stressTest(partitionWrites: Boolean): Unit = {
val numRecords = 10000
val inputDir = Utils.createTempDir(namePrefix = "stream.input").getCanonicalPath
val stagingDir = Utils.createTempDir(namePrefix = "stream.staging").getCanonicalPath
@ -93,18 +101,36 @@ class FileStressSuite extends StreamTest with SharedSQLContext {
writer.start()
val input = sqlContext.read.format("text").stream(inputDir)
def startStream(): ContinuousQuery = input
def startStream(): ContinuousQuery = {
val output = input
.repartition(5)
.as[String]
.mapPartitions { iter =>
val rand = Random.nextInt(100)
if (rand < 5) { sys.error("failure") }
if (rand < 10) {
sys.error("failure")
}
iter.map(_.toLong)
}
.map(x => (x % 400, x.toString))
.toDF("id", "data")
if (partitionWrites) {
output
.write
.partitionBy("id")
.format("parquet")
.option("checkpointLocation", checkpoint)
.startStream(outputDir)
} else {
output
.write
.format("parquet")
.option("checkpointLocation", checkpoint)
.startStream(outputDir)
}
}
var failures = 0
val streamThread = new Thread("stream runner") {