[SPARK-27875][CORE][SQL][ML][K8S] Wrap all PrintWriter with Utils.tryWithResource

## What changes were proposed in this pull request?

This pr wrap all `PrintWriter` with `Utils.tryWithResource` to prevent resource leak.

## How was this patch tested?

Existing test

Closes #24739 from wangyum/SPARK-27875.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
This commit is contained in:
Yuming Wang 2019-05-30 19:54:32 +09:00 committed by HyukjinKwon
parent d0a5aea12e
commit db3e746b64
12 changed files with 88 additions and 83 deletions

View file

@ -42,9 +42,9 @@ class PythonBroadcastSuite extends SparkFunSuite with Matchers with SharedSparkC
withTempDir { tempDir =>
val broadcastDataFile: File = {
val file = new File(tempDir, "broadcastData")
val printWriter = new PrintWriter(file)
printWriter.write(broadcastedString)
printWriter.close()
Utils.tryWithResource(new PrintWriter(file)) { printWriter =>
printWriter.write(broadcastedString)
}
file
}
val broadcast = new PythonBroadcast(broadcastDataFile.getAbsolutePath)

View file

@ -749,10 +749,10 @@ class SparkSubmitSuite
withTempDir { tmpDir =>
// Test jars and files
val f1 = File.createTempFile("test-submit-jars-files", "", tmpDir)
val writer1 = new PrintWriter(f1)
writer1.println("spark.jars " + jars)
writer1.println("spark.files " + files)
writer1.close()
Utils.tryWithResource(new PrintWriter(f1)) { writer =>
writer.println("spark.jars " + jars)
writer.println("spark.files " + files)
}
val clArgs = Seq(
"--master", "local",
"--class", "org.SomeClass",
@ -766,10 +766,10 @@ class SparkSubmitSuite
// Test files and archives (Yarn)
val f2 = File.createTempFile("test-submit-files-archives", "", tmpDir)
val writer2 = new PrintWriter(f2)
writer2.println("spark.yarn.dist.files " + files)
writer2.println("spark.yarn.dist.archives " + archives)
writer2.close()
Utils.tryWithResource(new PrintWriter(f2)) { writer =>
writer.println("spark.yarn.dist.files " + files)
writer.println("spark.yarn.dist.archives " + archives)
}
val clArgs2 = Seq(
"--master", "yarn",
"--class", "org.SomeClass",
@ -783,9 +783,9 @@ class SparkSubmitSuite
// Test python files
val f3 = File.createTempFile("test-submit-python-files", "", tmpDir)
val writer3 = new PrintWriter(f3)
writer3.println("spark.submit.pyFiles " + pyFiles)
writer3.close()
Utils.tryWithResource(new PrintWriter(f3)) { writer =>
writer.println("spark.submit.pyFiles " + pyFiles)
}
val clArgs3 = Seq(
"--master", "local",
"--properties-file", f3.getPath,
@ -802,10 +802,10 @@ class SparkSubmitSuite
val f4 = File.createTempFile("test-submit-remote-python-files", "", tmpDir)
val pyFile1 = File.createTempFile("file1", ".py", tmpDir)
val pyFile2 = File.createTempFile("file2", ".py", tmpDir)
val writer4 = new PrintWriter(f4)
val remotePyFiles = s"s3a://${pyFile1.getAbsolutePath},s3a://${pyFile2.getAbsolutePath}"
writer4.println("spark.submit.pyFiles " + remotePyFiles)
writer4.close()
Utils.tryWithResource(new PrintWriter(f4)) { writer =>
writer.println("spark.submit.pyFiles " + remotePyFiles)
}
val clArgs4 = Seq(
"--master", "yarn",
"--deploy-mode", "cluster",

View file

@ -770,11 +770,11 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
// write out one totally bogus hidden file
val hiddenGarbageFile = new File(testDir, ".garbage")
val out = new PrintWriter(hiddenGarbageFile)
// scalastyle:off println
out.println("GARBAGE")
// scalastyle:on println
out.close()
Utils.tryWithResource(new PrintWriter(hiddenGarbageFile)) { out =>
// scalastyle:off println
out.println("GARBAGE")
// scalastyle:on println
}
// also write out one real event log file, but since its a hidden file, we shouldn't read it
val tmpNewAppFile = newLogFile("hidden", None, inProgress = false)

View file

@ -51,13 +51,13 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
testTempDir.mkdir()
tmpFile = new File(testTempDir, getClass.getSimpleName + ".txt")
val pw = new PrintWriter(new FileWriter(tmpFile))
for (x <- 1 to numRecords) {
// scalastyle:off println
pw.println(RandomUtils.nextInt(0, numBuckets))
// scalastyle:on println
Utils.tryWithResource(new PrintWriter(tmpFile)) { pw =>
for (x <- 1 to numRecords) {
// scalastyle:off println
pw.println(RandomUtils.nextInt(0, numBuckets))
// scalastyle:on println
}
}
pw.close()
// Path to tmpFile
tmpFilePath = tmpFile.toURI.toString

View file

@ -50,15 +50,15 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp
test("Simple replay") {
val logFilePath = getFilePath(testDir, "events.txt")
val fstream = fileSystem.create(logFilePath)
val writer = new PrintWriter(fstream)
val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None,
125L, "Mickey", None)
val applicationEnd = SparkListenerApplicationEnd(1000L)
// scalastyle:off println
writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart))))
writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd))))
// scalastyle:on println
writer.close()
Utils.tryWithResource(new PrintWriter(fstream)) { writer =>
// scalastyle:off println
writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart))))
writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd))))
// scalastyle:on println
}
val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath)
val logData = fileSystem.open(logFilePath)
@ -132,16 +132,16 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp
test("Replay incompatible event log") {
val logFilePath = getFilePath(testDir, "incompatible.txt")
val fstream = fileSystem.create(logFilePath)
val writer = new PrintWriter(fstream)
val applicationStart = SparkListenerApplicationStart("Incompatible App", None,
125L, "UserUsingIncompatibleVersion", None)
val applicationEnd = SparkListenerApplicationEnd(1000L)
// scalastyle:off println
writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart))))
writer.println("""{"Event":"UnrecognizedEventOnlyForTest","Timestamp":1477593059313}""")
writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd))))
// scalastyle:on println
writer.close()
Utils.tryWithResource(new PrintWriter(fstream)) { writer =>
// scalastyle:off println
writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart))))
writer.println("""{"Event":"UnrecognizedEventOnlyForTest","Timestamp":1477593059313}""")
writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd))))
// scalastyle:on println
}
val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath)
val logData = fileSystem.open(logFilePath)

View file

@ -22,6 +22,8 @@ import java.io.PrintWriter
import scala.reflect.ClassTag
import scala.xml.Utility
import org.apache.spark.util.Utils
/**
* Code generator for shared params (sharedParams.scala). Run under the Spark folder with
* {{{
@ -103,9 +105,9 @@ private[shared] object SharedParamsCodeGen {
val code = genSharedParams(params)
val file = "src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala"
val writer = new PrintWriter(file)
writer.write(code)
writer.close()
Utils.tryWithResource(new PrintWriter(file)) { writer =>
writer.write(code)
}
}
/** Description of a param. */

View file

@ -23,6 +23,7 @@ import io.fabric8.kubernetes.api.model.ConfigMap
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s._
import org.apache.spark.util.Utils
class PodTemplateConfigMapStepSuite extends SparkFunSuite {
@ -46,9 +47,9 @@ class PodTemplateConfigMapStepSuite extends SparkFunSuite {
.set(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE, templateFile.getAbsolutePath)
val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf)
val writer = new PrintWriter(templateFile)
writer.write("pod-template-contents")
writer.close()
Utils.tryWithResource(new PrintWriter(templateFile)) { writer =>
writer.write("pod-template-contents")
}
val step = new PodTemplateConfigMapStep(kubernetesConf)
val configuredPod = step.configurePod(SparkPod.initialPod())

View file

@ -93,9 +93,9 @@ package object util extends Logging {
}
def stringToFile(file: File, str: String): File = {
val out = new PrintWriter(file)
out.write(str)
out.close()
Utils.tryWithResource(new PrintWriter(file)) { out =>
out.write(str)
}
file
}
@ -115,9 +115,10 @@ package object util extends Logging {
def stackTraceToString(t: Throwable): String = {
val out = new java.io.ByteArrayOutputStream
val writer = new PrintWriter(out)
t.printStackTrace(writer)
writer.flush()
Utils.tryWithResource(new PrintWriter(out)) { writer =>
t.printStackTrace(writer)
writer.flush()
}
new String(out.toByteArray, StandardCharsets.UTF_8)
}

View file

@ -2720,14 +2720,14 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
test("Refresh table before drop database cascade") {
withTempDir { tempDir =>
val file1 = new File(tempDir + "/first.csv")
val writer1 = new PrintWriter(file1)
writer1.write("first")
writer1.close()
Utils.tryWithResource(new PrintWriter(file1)) { writer =>
writer.write("first")
}
val file2 = new File(tempDir + "/second.csv")
val writer2 = new PrintWriter(file2)
writer2.write("second")
writer2.close()
Utils.tryWithResource(new PrintWriter(file2)) { writer =>
writer.write("second")
}
withDatabase("foo") {
withTable("foo.first") {

View file

@ -40,6 +40,7 @@ import org.apache.spark.sql.hive.HiveExternalCatalog._
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleton {
@ -77,14 +78,14 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
withTempDir { tempDir =>
// EXTERNAL OpenCSVSerde table pointing to LOCATION
val file1 = new File(tempDir + "/data1")
val writer1 = new PrintWriter(file1)
writer1.write("1,2")
writer1.close()
Utils.tryWithResource(new PrintWriter(file1)) { writer =>
writer.write("1,2")
}
val file2 = new File(tempDir + "/data2")
val writer2 = new PrintWriter(file2)
writer2.write("1,2")
writer2.close()
Utils.tryWithResource(new PrintWriter(file2)) { writer =>
writer.write("1,2")
}
sql(
s"""
@ -957,9 +958,9 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
withTempDir { loadPath =>
// load data command
val file = new File(loadPath + "/data")
val writer = new PrintWriter(file)
writer.write("2,xyz")
writer.close()
Utils.tryWithResource(new PrintWriter(file)) { writer =>
writer.write("2,xyz")
}
sql(s"LOAD DATA INPATH '${loadPath.toURI.toString}' INTO TABLE $table")
if (autoUpdate) {
val fetched2 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = None)
@ -994,14 +995,14 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
withTempPaths(numPaths = 2) { case Seq(dir1, dir2) =>
val file1 = new File(dir1 + "/data")
val writer1 = new PrintWriter(file1)
writer1.write("1,a")
writer1.close()
Utils.tryWithResource(new PrintWriter(file1)) { writer =>
writer.write("1,a")
}
val file2 = new File(dir2 + "/data")
val writer2 = new PrintWriter(file2)
writer2.write("1,a")
writer2.close()
Utils.tryWithResource(new PrintWriter(file2)) { writer =>
writer.write("1,a")
}
// add partition command
sql(

View file

@ -949,9 +949,9 @@ class VersionsSuite extends SparkFunSuite with Logging {
|}
""".stripMargin
val schemaFile = new File(dir, "avroDecimal.avsc")
val writer = new PrintWriter(schemaFile)
writer.write(avroSchema)
writer.close()
Utils.tryWithResource(new PrintWriter(schemaFile)) { writer =>
writer.write(avroSchema)
}
val schemaPath = schemaFile.toURI.toString
val url = Thread.currentThread().getContextClassLoader.getResource("avroDecimal")

View file

@ -454,14 +454,14 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
// EXTERNAL OpenCSVSerde table pointing to LOCATION
val file1 = new File(tempDir + "/data1")
val writer1 = new PrintWriter(file1)
writer1.write("1,2")
writer1.close()
Utils.tryWithResource(new PrintWriter(file1)) { writer =>
writer.write("1,2")
}
val file2 = new File(tempDir + "/data2")
val writer2 = new PrintWriter(file2)
writer2.write("1,2")
writer2.close()
Utils.tryWithResource(new PrintWriter(file2)) { writer =>
writer.write("1,2")
}
sql(
s"""CREATE EXTERNAL TABLE csv_table(page_id INT, impressions INT)