[SPARK-26081][SQL] Prevent empty files for empty partitions in Text datasources

## What changes were proposed in this pull request?

In the PR, I propose to postpone creation of `OutputStream`/`Univocity`/`JacksonGenerator` till the first row should be written. This prevents creation of empty files for empty partitions. So, no need to open and to read such files back while loading data from the location.

## How was this patch tested?

Added tests for Text, JSON and CSV datasource where empty dataset is written but should not produce any files.

Closes #23052 from MaxGekk/text-empty-files.

Lead-authored-by: Maxim Gekk <max.gekk@gmail.com>
Co-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
This commit is contained in:
Maxim Gekk 2018-11-29 10:31:31 -06:00 committed by Sean Owen
parent 9a09e91a3e
commit 31c4fab3fb
6 changed files with 63 additions and 21 deletions

View file

@ -169,13 +169,19 @@ private[csv] class CsvOutputWriter(
context: TaskAttemptContext,
params: CSVOptions) extends OutputWriter with Logging {
private val charset = Charset.forName(params.charset)
private var univocityGenerator: Option[UnivocityGenerator] = None
private val writer = CodecStreams.createOutputStreamWriter(context, new Path(path), charset)
override def write(row: InternalRow): Unit = {
val gen = univocityGenerator.getOrElse {
val charset = Charset.forName(params.charset)
val os = CodecStreams.createOutputStreamWriter(context, new Path(path), charset)
val newGen = new UnivocityGenerator(dataSchema, os, params)
univocityGenerator = Some(newGen)
newGen
}
private val gen = new UnivocityGenerator(dataSchema, writer, params)
gen.write(row)
}
override def write(row: InternalRow): Unit = gen.write(row)
override def close(): Unit = gen.close()
override def close(): Unit = univocityGenerator.map(_.close())
}

View file

@ -175,19 +175,20 @@ private[json] class JsonOutputWriter(
" which can be read back by Spark only if multiLine is enabled.")
}
private val writer = CodecStreams.createOutputStreamWriter(
context, new Path(path), encoding)
// create the Generator without separator inserted between 2 records
private[this] val gen = new JacksonGenerator(dataSchema, writer, options)
private var jacksonGenerator: Option[JacksonGenerator] = None
override def write(row: InternalRow): Unit = {
val gen = jacksonGenerator.getOrElse {
val os = CodecStreams.createOutputStreamWriter(context, new Path(path), encoding)
// create the Generator without separator inserted between 2 records
val newGen = new JacksonGenerator(dataSchema, os, options)
jacksonGenerator = Some(newGen)
newGen
}
gen.write(row)
gen.writeLineEnding()
}
override def close(): Unit = {
gen.close()
writer.close()
}
override def close(): Unit = jacksonGenerator.map(_.close())
}

View file

@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.datasources.text
import java.io.OutputStream
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
@ -148,17 +150,23 @@ class TextOutputWriter(
context: TaskAttemptContext)
extends OutputWriter {
private val writer = CodecStreams.createOutputStream(context, new Path(path))
private var outputStream: Option[OutputStream] = None
override def write(row: InternalRow): Unit = {
val os = outputStream.getOrElse{
val newStream = CodecStreams.createOutputStream(context, new Path(path))
outputStream = Some(newStream)
newStream
}
if (!row.isNullAt(0)) {
val utf8string = row.getUTF8String(0)
utf8string.writeTo(writer)
utf8string.writeTo(os)
}
writer.write(lineSeparator)
os.write(lineSeparator)
}
override def close(): Unit = {
writer.close()
outputStream.map(_.close())
}
}

View file

@ -1986,4 +1986,13 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
}.getMessage
assert(errMsg2.contains("'lineSep' can contain only 1 character"))
}
test("do not produce empty files for empty partitions") {
withTempPath { dir =>
val path = dir.getCanonicalPath
spark.emptyDataset[String].write.csv(path)
val files = new File(path).listFiles()
assert(!files.exists(_.getName.endsWith("csv")))
}
}
}

View file

@ -1897,7 +1897,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
.text(path)
val jsonDF = spark.read.option("multiLine", true).option("mode", "PERMISSIVE").json(path)
assert(jsonDF.count() === corruptRecordCount + 1) // null row for empty file
assert(jsonDF.count() === corruptRecordCount)
assert(jsonDF.schema === new StructType()
.add("_corrupt_record", StringType)
.add("dummy", StringType))
@ -1910,7 +1910,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
F.count($"dummy").as("valid"),
F.count($"_corrupt_record").as("corrupt"),
F.count("*").as("count"))
checkAnswer(counts, Row(1, 5, 7)) // null row for empty file
checkAnswer(counts, Row(1, 4, 6)) // null row for empty file
}
}
@ -2555,4 +2555,13 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
emptyString(StringType, "")
emptyString(BinaryType, "".getBytes(StandardCharsets.UTF_8))
}
test("do not produce empty files for empty partitions") {
withTempPath { dir =>
val path = dir.getCanonicalPath
spark.emptyDataset[String].write.json(path)
val files = new File(path).listFiles()
assert(!files.exists(_.getName.endsWith("json")))
}
}
}

View file

@ -233,4 +233,13 @@ class TextSuite extends QueryTest with SharedSQLContext {
assert(data(3) == Row("\"doh\""))
assert(data.length == 4)
}
test("do not produce empty files for empty partitions") {
withTempPath { dir =>
val path = dir.getCanonicalPath
spark.emptyDataset[String].write.text(path)
val files = new File(path).listFiles()
assert(!files.exists(_.getName.endsWith("txt")))
}
}
}