[SPARK-16116][SQL] ConsoleSink should not require checkpointLocation
## What changes were proposed in this pull request? When the user uses `ConsoleSink`, we should use a temp location if `checkpointLocation` is not specified. ## How was this patch tested? The added unit test. Author: Shixiong Zhu <shixiong@databricks.com> Closes #13817 from zsxwing/console-checkpoint.
This commit is contained in:
parent
b5a997667f
commit
d85bb10ce4
|
@ -272,6 +272,12 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
|
|||
useTempCheckpointLocation = true,
|
||||
trigger = trigger)
|
||||
} else {
|
||||
val (useTempCheckpointLocation, recoverFromCheckpointLocation) =
|
||||
if (source == "console") {
|
||||
(true, false)
|
||||
} else {
|
||||
(false, true)
|
||||
}
|
||||
val dataSource =
|
||||
DataSource(
|
||||
df.sparkSession,
|
||||
|
@ -284,6 +290,8 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
|
|||
df,
|
||||
dataSource.createSink(outputMode),
|
||||
outputMode,
|
||||
useTempCheckpointLocation = useTempCheckpointLocation,
|
||||
recoverFromCheckpointLocation = recoverFromCheckpointLocation,
|
||||
trigger = trigger)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -457,4 +457,14 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("ConsoleSink should not require checkpointLocation") {
|
||||
LastOptions.clear()
|
||||
val df = spark.readStream
|
||||
.format("org.apache.spark.sql.streaming.test")
|
||||
.load()
|
||||
|
||||
val sq = df.writeStream.format("console").start()
|
||||
sq.stop()
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue