[SPARK-16104] [SQL] Do not creaate CSV writer object for every flush when writing
## What changes were proposed in this pull request? This PR let `CsvWriter` object is not created for each time but able to be reused. This way was taken after from JSON data source. Original `CsvWriter` was being created for each row but it was enhanced in https://github.com/apache/spark/pull/13229. However, it still creates `CsvWriter` object for each `flush()` in `LineCsvWriter`. It seems it does not have to close the object and re-create this for every flush. It follows the original logic as it is but `CsvWriter` is reused by reseting `CharArrayWriter`. ## How was this patch tested? Existing tests should cover this. Author: hyukjinkwon <gurwls223@gmail.com> Closes #13809 from HyukjinKwon/write-perf.
This commit is contained in:
parent
d77c4e6e2e
commit
7580f3041a
|
@ -17,8 +17,7 @@
|
||||||
|
|
||||||
package org.apache.spark.sql.execution.datasources.csv
|
package org.apache.spark.sql.execution.datasources.csv
|
||||||
|
|
||||||
import java.io.{ByteArrayOutputStream, OutputStreamWriter, StringReader}
|
import java.io.{CharArrayWriter, StringReader}
|
||||||
import java.nio.charset.StandardCharsets
|
|
||||||
|
|
||||||
import com.univocity.parsers.csv._
|
import com.univocity.parsers.csv._
|
||||||
|
|
||||||
|
@ -77,10 +76,8 @@ private[sql] class LineCsvWriter(params: CSVOptions, headers: Seq[String]) exten
|
||||||
writerSettings.setHeaders(headers: _*)
|
writerSettings.setHeaders(headers: _*)
|
||||||
writerSettings.setQuoteEscapingEnabled(params.escapeQuotes)
|
writerSettings.setQuoteEscapingEnabled(params.escapeQuotes)
|
||||||
|
|
||||||
private var buffer = new ByteArrayOutputStream()
|
private val buffer = new CharArrayWriter()
|
||||||
private var writer = new CsvWriter(
|
private val writer = new CsvWriter(buffer, writerSettings)
|
||||||
new OutputStreamWriter(buffer, StandardCharsets.UTF_8),
|
|
||||||
writerSettings)
|
|
||||||
|
|
||||||
def writeRow(row: Seq[String], includeHeader: Boolean): Unit = {
|
def writeRow(row: Seq[String], includeHeader: Boolean): Unit = {
|
||||||
if (includeHeader) {
|
if (includeHeader) {
|
||||||
|
@ -90,14 +87,15 @@ private[sql] class LineCsvWriter(params: CSVOptions, headers: Seq[String]) exten
|
||||||
}
|
}
|
||||||
|
|
||||||
def flush(): String = {
|
def flush(): String = {
|
||||||
writer.close()
|
writer.flush()
|
||||||
val lines = buffer.toString.stripLineEnd
|
val lines = buffer.toString.stripLineEnd
|
||||||
buffer = new ByteArrayOutputStream()
|
buffer.reset()
|
||||||
writer = new CsvWriter(
|
|
||||||
new OutputStreamWriter(buffer, StandardCharsets.UTF_8),
|
|
||||||
writerSettings)
|
|
||||||
lines
|
lines
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def close(): Unit = {
|
||||||
|
writer.close()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -223,6 +223,7 @@ private[sql] class CsvOutputWriter(
|
||||||
|
|
||||||
override def close(): Unit = {
|
override def close(): Unit = {
|
||||||
flush()
|
flush()
|
||||||
|
csvWriter.close()
|
||||||
recordWriter.close(context)
|
recordWriter.close(context)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue