[SPARK-22781][SS] Support creating streaming dataset with ORC files

## What changes were proposed in this pull request?

Like `Parquet`, users can use `ORC` with Apache Spark structured streaming. This PR adds `orc()` to `DataStreamReader`(Scala/Python) in order to support creating streaming dataset with ORC file format more easily like the other file formats. Also, this adds a test coverage for ORC data source and updates the document.

**BEFORE**

```scala
scala> spark.readStream.schema("a int").orc("/tmp/orc_ss").writeStream.format("console").start()
<console>:24: error: value orc is not a member of org.apache.spark.sql.streaming.DataStreamReader
       spark.readStream.schema("a int").orc("/tmp/orc_ss").writeStream.format("console").start()
```

**AFTER**
```scala
scala> spark.readStream.schema("a int").orc("/tmp/orc_ss").writeStream.format("console").start()
res0: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper678b3746

scala>
-------------------------------------------
Batch: 0
-------------------------------------------
+---+
|  a|
+---+
|  1|
+---+
```

## How was this patch tested?

Pass the newly added test cases.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #19975 from dongjoon-hyun/SPARK-22781.
This commit is contained in:
Dongjoon Hyun 2017-12-19 23:50:06 -08:00 committed by Shixiong Zhu
parent 13268a58f8
commit 9962390af7
5 changed files with 148 additions and 1 deletions

View file

@ -493,7 +493,7 @@ returned by `SparkSession.readStream()`. In [R](api/R/read.stream.html), with th
#### Input Sources #### Input Sources
There are a few built-in sources. There are a few built-in sources.
- **File source** - Reads files written in a directory as a stream of data. Supported file formats are text, csv, json, parquet. See the docs of the DataStreamReader interface for a more up-to-date list, and supported options for each file format. Note that the files must be atomically placed in the given directory, which in most file systems, can be achieved by file move operations. - **File source** - Reads files written in a directory as a stream of data. Supported file formats are text, csv, json, orc, parquet. See the docs of the DataStreamReader interface for a more up-to-date list, and supported options for each file format. Note that the files must be atomically placed in the given directory, which in most file systems, can be achieved by file move operations.
- **Kafka source** - Reads data from Kafka. It's compatible with Kafka broker versions 0.10.0 or higher. See the [Kafka Integration Guide](structured-streaming-kafka-integration.html) for more details. - **Kafka source** - Reads data from Kafka. It's compatible with Kafka broker versions 0.10.0 or higher. See the [Kafka Integration Guide](structured-streaming-kafka-integration.html) for more details.

View file

@ -490,6 +490,23 @@ class DataStreamReader(OptionUtils):
else: else:
raise TypeError("path can be only a single string") raise TypeError("path can be only a single string")
@since(2.3)
def orc(self, path):
"""Loads a ORC file stream, returning the result as a :class:`DataFrame`.
.. note:: Evolving.
>>> orc_sdf = spark.readStream.schema(sdf_schema).orc(tempfile.mkdtemp())
>>> orc_sdf.isStreaming
True
>>> orc_sdf.schema == sdf_schema
True
"""
if isinstance(path, basestring):
return self._df(self._jreader.orc(path))
else:
raise TypeError("path can be only a single string")
@since(2.0) @since(2.0)
def parquet(self, path): def parquet(self, path):
"""Loads a Parquet file stream, returning the result as a :class:`DataFrame`. """Loads a Parquet file stream, returning the result as a :class:`DataFrame`.

View file

@ -298,6 +298,21 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
*/ */
def csv(path: String): DataFrame = format("csv").load(path) def csv(path: String): DataFrame = format("csv").load(path)
/**
* Loads a ORC file stream, returning the result as a `DataFrame`.
*
* You can set the following ORC-specific option(s) for reading ORC files:
* <ul>
* <li>`maxFilesPerTrigger` (default: no max limit): sets the maximum number of new files to be
* considered in every trigger.</li>
* </ul>
*
* @since 2.3.0
*/
def orc(path: String): DataFrame = {
format("orc").load(path)
}
/** /**
* Loads a Parquet file stream, returning the result as a `DataFrame`. * Loads a Parquet file stream, returning the result as a `DataFrame`.
* *

View file

@ -305,6 +305,10 @@ class FileStreamSinkSuite extends StreamTest {
testFormat(Some("parquet")) testFormat(Some("parquet"))
} }
test("orc") {
testFormat(Some("orc"))
}
test("text") { test("text") {
testFormat(Some("text")) testFormat(Some("text"))
} }

View file

@ -87,6 +87,28 @@ abstract class FileStreamSourceTest
} }
} }
case class AddOrcFileData(data: DataFrame, src: File, tmp: File) extends AddFileData {
override def addData(source: FileStreamSource): Unit = {
AddOrcFileData.writeToFile(data, src, tmp)
}
}
object AddOrcFileData {
def apply(seq: Seq[String], src: File, tmp: File): AddOrcFileData = {
AddOrcFileData(seq.toDS().toDF(), src, tmp)
}
/** Write orc files in a temp dir, and move the individual files to the 'src' dir */
def writeToFile(df: DataFrame, src: File, tmp: File): Unit = {
val tmpDir = Utils.tempFileWith(new File(tmp, "orc"))
df.write.orc(tmpDir.getCanonicalPath)
src.mkdirs()
tmpDir.listFiles().foreach { f =>
f.renameTo(new File(src, s"${f.getName}"))
}
}
}
case class AddParquetFileData(data: DataFrame, src: File, tmp: File) extends AddFileData { case class AddParquetFileData(data: DataFrame, src: File, tmp: File) extends AddFileData {
override def addData(source: FileStreamSource): Unit = { override def addData(source: FileStreamSource): Unit = {
AddParquetFileData.writeToFile(data, src, tmp) AddParquetFileData.writeToFile(data, src, tmp)
@ -249,6 +271,42 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
} }
} }
// =============== ORC file stream schema tests ================
test("FileStreamSource schema: orc, existing files, no schema") {
withTempDir { src =>
Seq("a", "b", "c").toDS().as("userColumn").toDF().write
.mode(org.apache.spark.sql.SaveMode.Overwrite)
.orc(src.getCanonicalPath)
// Without schema inference, should throw error
withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "false") {
intercept[IllegalArgumentException] {
createFileStreamSourceAndGetSchema(
format = Some("orc"), path = Some(src.getCanonicalPath), schema = None)
}
}
// With schema inference, should infer correct schema
withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") {
val schema = createFileStreamSourceAndGetSchema(
format = Some("orc"), path = Some(src.getCanonicalPath), schema = None)
assert(schema === new StructType().add("value", StringType))
}
}
}
test("FileStreamSource schema: orc, existing files, schema") {
withTempPath { src =>
Seq("a", "b", "c").toDS().as("oldUserColumn").toDF()
.write.orc(new File(src, "1").getCanonicalPath)
val userSchema = new StructType().add("userColumn", StringType)
val schema = createFileStreamSourceAndGetSchema(
format = Some("orc"), path = Some(src.getCanonicalPath), schema = Some(userSchema))
assert(schema === userSchema)
}
}
// =============== Parquet file stream schema tests ================ // =============== Parquet file stream schema tests ================
test("FileStreamSource schema: parquet, existing files, no schema") { test("FileStreamSource schema: parquet, existing files, no schema") {
@ -508,6 +566,59 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
} }
} }
// =============== ORC file stream tests ================
test("read from orc files") {
withTempDirs { case (src, tmp) =>
val fileStream = createFileStream("orc", src.getCanonicalPath, Some(valueSchema))
val filtered = fileStream.filter($"value" contains "keep")
testStream(filtered)(
AddOrcFileData(Seq("drop1", "keep2", "keep3"), src, tmp),
CheckAnswer("keep2", "keep3"),
StopStream,
AddOrcFileData(Seq("drop4", "keep5", "keep6"), src, tmp),
StartStream(),
CheckAnswer("keep2", "keep3", "keep5", "keep6"),
AddOrcFileData(Seq("drop7", "keep8", "keep9"), src, tmp),
CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
)
}
}
test("read from orc files with changing schema") {
withTempDirs { case (src, tmp) =>
withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") {
// Add a file so that we can infer its schema
AddOrcFileData.writeToFile(Seq("value0").toDF("k"), src, tmp)
val fileStream = createFileStream("orc", src.getCanonicalPath)
// FileStreamSource should infer the column "k"
assert(fileStream.schema === StructType(Seq(StructField("k", StringType))))
// After creating DF and before starting stream, add data with different schema
// Should not affect the inferred schema any more
AddOrcFileData.writeToFile(Seq(("value1", 0)).toDF("k", "v"), src, tmp)
testStream(fileStream)(
// Should not pick up column v in the file added before start
AddOrcFileData(Seq("value2").toDF("k"), src, tmp),
CheckAnswer("value0", "value1", "value2"),
// Should read data in column k, and ignore v
AddOrcFileData(Seq(("value3", 1)).toDF("k", "v"), src, tmp),
CheckAnswer("value0", "value1", "value2", "value3"),
// Should ignore rows that do not have the necessary k column
AddOrcFileData(Seq("value5").toDF("v"), src, tmp),
CheckAnswer("value0", "value1", "value2", "value3", null)
)
}
}
}
// =============== Parquet file stream tests ================ // =============== Parquet file stream tests ================
test("read from parquet files") { test("read from parquet files") {