[SPARK-26081][SQL] Prevent empty files for empty partitions in Text datasources
## What changes were proposed in this pull request? In the PR, I propose to postpone creation of `OutputStream`/`Univocity`/`JacksonGenerator` till the first row should be written. This prevents creation of empty files for empty partitions. So, no need to open and to read such files back while loading data from the location. ## How was this patch tested? Added tests for Text, JSON and CSV datasource where empty dataset is written but should not produce any files. Closes #23052 from MaxGekk/text-empty-files. Lead-authored-by: Maxim Gekk <max.gekk@gmail.com> Co-authored-by: Maxim Gekk <maxim.gekk@databricks.com> Signed-off-by: Sean Owen <sean.owen@databricks.com>
This commit is contained in:
parent
9a09e91a3e
commit
31c4fab3fb
|
@ -169,13 +169,19 @@ private[csv] class CsvOutputWriter(
|
|||
context: TaskAttemptContext,
|
||||
params: CSVOptions) extends OutputWriter with Logging {
|
||||
|
||||
private val charset = Charset.forName(params.charset)
|
||||
private var univocityGenerator: Option[UnivocityGenerator] = None
|
||||
|
||||
private val writer = CodecStreams.createOutputStreamWriter(context, new Path(path), charset)
|
||||
|
||||
private val gen = new UnivocityGenerator(dataSchema, writer, params)
|
||||
|
||||
override def write(row: InternalRow): Unit = gen.write(row)
|
||||
|
||||
override def close(): Unit = gen.close()
|
||||
override def write(row: InternalRow): Unit = {
|
||||
val gen = univocityGenerator.getOrElse {
|
||||
val charset = Charset.forName(params.charset)
|
||||
val os = CodecStreams.createOutputStreamWriter(context, new Path(path), charset)
|
||||
val newGen = new UnivocityGenerator(dataSchema, os, params)
|
||||
univocityGenerator = Some(newGen)
|
||||
newGen
|
||||
}
|
||||
|
||||
gen.write(row)
|
||||
}
|
||||
|
||||
override def close(): Unit = univocityGenerator.map(_.close())
|
||||
}
|
||||
|
|
|
@ -175,19 +175,20 @@ private[json] class JsonOutputWriter(
|
|||
" which can be read back by Spark only if multiLine is enabled.")
|
||||
}
|
||||
|
||||
private val writer = CodecStreams.createOutputStreamWriter(
|
||||
context, new Path(path), encoding)
|
||||
|
||||
// create the Generator without separator inserted between 2 records
|
||||
private[this] val gen = new JacksonGenerator(dataSchema, writer, options)
|
||||
private var jacksonGenerator: Option[JacksonGenerator] = None
|
||||
|
||||
override def write(row: InternalRow): Unit = {
|
||||
val gen = jacksonGenerator.getOrElse {
|
||||
val os = CodecStreams.createOutputStreamWriter(context, new Path(path), encoding)
|
||||
// create the Generator without separator inserted between 2 records
|
||||
val newGen = new JacksonGenerator(dataSchema, os, options)
|
||||
jacksonGenerator = Some(newGen)
|
||||
newGen
|
||||
}
|
||||
|
||||
gen.write(row)
|
||||
gen.writeLineEnding()
|
||||
}
|
||||
|
||||
override def close(): Unit = {
|
||||
gen.close()
|
||||
writer.close()
|
||||
}
|
||||
override def close(): Unit = jacksonGenerator.map(_.close())
|
||||
}
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
|
||||
package org.apache.spark.sql.execution.datasources.text
|
||||
|
||||
import java.io.OutputStream
|
||||
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.fs.{FileStatus, Path}
|
||||
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
|
||||
|
@ -148,17 +150,23 @@ class TextOutputWriter(
|
|||
context: TaskAttemptContext)
|
||||
extends OutputWriter {
|
||||
|
||||
private val writer = CodecStreams.createOutputStream(context, new Path(path))
|
||||
private var outputStream: Option[OutputStream] = None
|
||||
|
||||
override def write(row: InternalRow): Unit = {
|
||||
val os = outputStream.getOrElse{
|
||||
val newStream = CodecStreams.createOutputStream(context, new Path(path))
|
||||
outputStream = Some(newStream)
|
||||
newStream
|
||||
}
|
||||
|
||||
if (!row.isNullAt(0)) {
|
||||
val utf8string = row.getUTF8String(0)
|
||||
utf8string.writeTo(writer)
|
||||
utf8string.writeTo(os)
|
||||
}
|
||||
writer.write(lineSeparator)
|
||||
os.write(lineSeparator)
|
||||
}
|
||||
|
||||
override def close(): Unit = {
|
||||
writer.close()
|
||||
outputStream.map(_.close())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1986,4 +1986,13 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
|
|||
}.getMessage
|
||||
assert(errMsg2.contains("'lineSep' can contain only 1 character"))
|
||||
}
|
||||
|
||||
test("do not produce empty files for empty partitions") {
|
||||
withTempPath { dir =>
|
||||
val path = dir.getCanonicalPath
|
||||
spark.emptyDataset[String].write.csv(path)
|
||||
val files = new File(path).listFiles()
|
||||
assert(!files.exists(_.getName.endsWith("csv")))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1897,7 +1897,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
|
|||
.text(path)
|
||||
|
||||
val jsonDF = spark.read.option("multiLine", true).option("mode", "PERMISSIVE").json(path)
|
||||
assert(jsonDF.count() === corruptRecordCount + 1) // null row for empty file
|
||||
assert(jsonDF.count() === corruptRecordCount)
|
||||
assert(jsonDF.schema === new StructType()
|
||||
.add("_corrupt_record", StringType)
|
||||
.add("dummy", StringType))
|
||||
|
@ -1910,7 +1910,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
|
|||
F.count($"dummy").as("valid"),
|
||||
F.count($"_corrupt_record").as("corrupt"),
|
||||
F.count("*").as("count"))
|
||||
checkAnswer(counts, Row(1, 5, 7)) // null row for empty file
|
||||
checkAnswer(counts, Row(1, 4, 6)) // null row for empty file
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2555,4 +2555,13 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
|
|||
emptyString(StringType, "")
|
||||
emptyString(BinaryType, "".getBytes(StandardCharsets.UTF_8))
|
||||
}
|
||||
|
||||
test("do not produce empty files for empty partitions") {
|
||||
withTempPath { dir =>
|
||||
val path = dir.getCanonicalPath
|
||||
spark.emptyDataset[String].write.json(path)
|
||||
val files = new File(path).listFiles()
|
||||
assert(!files.exists(_.getName.endsWith("json")))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -233,4 +233,13 @@ class TextSuite extends QueryTest with SharedSQLContext {
|
|||
assert(data(3) == Row("\"doh\""))
|
||||
assert(data.length == 4)
|
||||
}
|
||||
|
||||
test("do not produce empty files for empty partitions") {
|
||||
withTempPath { dir =>
|
||||
val path = dir.getCanonicalPath
|
||||
spark.emptyDataset[String].write.text(path)
|
||||
val files = new File(path).listFiles()
|
||||
assert(!files.exists(_.getName.endsWith("txt")))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue