[SPARK-21170][CORE] Utils.tryWithSafeFinallyAndFailureCallbacks throws IllegalArgumentException: Self-suppression not permitted

## What changes were proposed in this pull request?

Not adding the exception to the suppressed if it is the same instance as originalThrowable.

## How was this patch tested?

Added new tests to verify this, these tests fail without source code changes and passes with the change.

Author: Devaraj K <devaraj@apache.org>

Closes #18384 from devaraj-kavali/SPARK-21170.
This commit is contained in:
Devaraj K 2017-07-01 15:53:49 +01:00 committed by Sean Owen
parent e0b047eafe
commit 6beca9ce94
2 changed files with 99 additions and 19 deletions

View file

@ -1348,14 +1348,10 @@ private[spark] object Utils extends Logging {
try {
finallyBlock
} catch {
case t: Throwable =>
if (originalThrowable != null) {
originalThrowable.addSuppressed(t)
logWarning(s"Suppressing exception in finally: " + t.getMessage, t)
throw originalThrowable
} else {
throw t
}
case t: Throwable if (originalThrowable != null && originalThrowable != t) =>
originalThrowable.addSuppressed(t)
logWarning(s"Suppressing exception in finally: ${t.getMessage}", t)
throw originalThrowable
}
}
}
@ -1387,22 +1383,20 @@ private[spark] object Utils extends Logging {
catchBlock
} catch {
case t: Throwable =>
originalThrowable.addSuppressed(t)
logWarning(s"Suppressing exception in catch: " + t.getMessage, t)
if (originalThrowable != t) {
originalThrowable.addSuppressed(t)
logWarning(s"Suppressing exception in catch: ${t.getMessage}", t)
}
}
throw originalThrowable
} finally {
try {
finallyBlock
} catch {
case t: Throwable =>
if (originalThrowable != null) {
originalThrowable.addSuppressed(t)
logWarning(s"Suppressing exception in finally: " + t.getMessage, t)
throw originalThrowable
} else {
throw t
}
case t: Throwable if (originalThrowable != null && originalThrowable != t) =>
originalThrowable.addSuppressed(t)
logWarning(s"Suppressing exception in finally: ${t.getMessage}", t)
throw originalThrowable
}
}
}

View file

@ -38,7 +38,7 @@ import org.apache.commons.math3.stat.inference.ChiSquareTest
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.{SparkConf, SparkFunSuite, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.network.util.ByteUnit
@ -1024,4 +1024,90 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
assert(redactedConf("spark.sensitive.property") === Utils.REDACTION_REPLACEMENT_TEXT)
}
test("tryWithSafeFinally") {
var e = new Error("Block0")
val finallyBlockError = new Error("Finally Block")
var isErrorOccurred = false
// if the try and finally blocks throw different exception instances
try {
Utils.tryWithSafeFinally { throw e }(finallyBlock = { throw finallyBlockError })
} catch {
case t: Error =>
assert(t.getSuppressed.head == finallyBlockError)
isErrorOccurred = true
}
assert(isErrorOccurred)
// if the try and finally blocks throw the same exception instance then it should not
// try to add to suppressed and get IllegalArgumentException
e = new Error("Block1")
isErrorOccurred = false
try {
Utils.tryWithSafeFinally { throw e }(finallyBlock = { throw e })
} catch {
case t: Error =>
assert(t.getSuppressed.length == 0)
isErrorOccurred = true
}
assert(isErrorOccurred)
// if the try throws the exception and finally doesn't throw exception
e = new Error("Block2")
isErrorOccurred = false
try {
Utils.tryWithSafeFinally { throw e }(finallyBlock = {})
} catch {
case t: Error =>
assert(t.getSuppressed.length == 0)
isErrorOccurred = true
}
assert(isErrorOccurred)
// if the try and finally block don't throw exception
Utils.tryWithSafeFinally {}(finallyBlock = {})
}
test("tryWithSafeFinallyAndFailureCallbacks") {
var e = new Error("Block0")
val catchBlockError = new Error("Catch Block")
val finallyBlockError = new Error("Finally Block")
var isErrorOccurred = false
TaskContext.setTaskContext(TaskContext.empty())
// if the try, catch and finally blocks throw different exception instances
try {
Utils.tryWithSafeFinallyAndFailureCallbacks { throw e }(
catchBlock = { throw catchBlockError }, finallyBlock = { throw finallyBlockError })
} catch {
case t: Error =>
assert(t.getSuppressed.head == catchBlockError)
assert(t.getSuppressed.last == finallyBlockError)
isErrorOccurred = true
}
assert(isErrorOccurred)
// if the try, catch and finally blocks throw the same exception instance then it should not
// try to add to suppressed and get IllegalArgumentException
e = new Error("Block1")
isErrorOccurred = false
try {
Utils.tryWithSafeFinallyAndFailureCallbacks { throw e }(catchBlock = { throw e },
finallyBlock = { throw e })
} catch {
case t: Error =>
assert(t.getSuppressed.length == 0)
isErrorOccurred = true
}
assert(isErrorOccurred)
// if the try throws the exception, catch and finally don't throw exceptions
e = new Error("Block2")
isErrorOccurred = false
try {
Utils.tryWithSafeFinallyAndFailureCallbacks { throw e }(catchBlock = {}, finallyBlock = {})
} catch {
case t: Error =>
assert(t.getSuppressed.length == 0)
isErrorOccurred = true
}
assert(isErrorOccurred)
// if the try, catch and finally blocks don't throw exceptions
Utils.tryWithSafeFinallyAndFailureCallbacks {}(catchBlock = {}, finallyBlock = {})
TaskContext.unset
}
}